Sync包实现并发
Sync包实现并发
一、静态
Go语言以构建高并发容易、性能优异而文明,但是随着并发的使用,可能发生数据争用的静态问题。由于不知道什么时候发生,所以讲产生难以发现和调试的错误。
例子:
func getNumber() int {
var i int
go func() {
i = 6
}()
return i
}
func main() {
fmt.Println(getNumber())
}
上例,getNumber()
函数先声明一个变量i,之后在goroutine中单独对i进行设置。而这时程序也正在从函数中返回i,由于不知道goroutine是否已经完成对i值的修改,所以将有两种操作发生:
- 如果goroutine已完成对i值的修改,则最后返回的i值为6;
- 如果goroutine未完成对i值的修改,则变量i的值从函数返回,为默认值0
现在根据这两个操作中的哪一个先完成,输出的记过将是0(默认整数值)或6。这就是为什么将其称为数据竟态:从getNumber()
函数返回的值会根据1、或2哪个操作先完成而得名。
解决方法:通道阻塞、互斥锁等。
二、互斥锁
1、什么是互斥锁
(1)sync.Mutex的定义
sync.Mutex
是一个结构体对象,用于实现互斥锁,适用于读写不确定的场景(即读写次数没有明显区别,并且只允许有一个写或读的场景)。所以该锁也称为“全局锁”。
sync.Mutex
结构体由两个字段state和sema组成,其中state表示当前互斥锁的状态,而sema用于控制锁状态的信号量。
type Mutex struct {
state int32
sema uint32
}
(2)sync.Mutex的方法
sync.Mutex
结构体对象有Lock()
、Unlock()
两个方法。Lock()
方法用于加锁,Unlock()
方法用于解锁。
在使用Lock()
加锁后,就不能再次对其进行加速(如果再次加锁,会造成死锁)。直到利用Unlock()
对其解锁才能再次加锁。
在用Unlock()
解锁Mutex时,如果未加锁,会导致运行时错误
Lock()和Unlock() 使用注意:
- 在一个goroutine获得Mutex后,其他goroutine只能等到这个goroutine释放该Mutex
- 在使用Lock()方法加锁后,不能再继续对其加锁,直到利用Unlock()解锁才能再次加锁
- 在Lock()方法之前使用Unlock() 会导致panic异常
- 已经锁定的Mutex并不与特定的goroutine关联,可以利用一个goroutine对其加锁,再利用其他goroutine对其解锁。
- 在同一个goroutine中的Mutex被解锁前再次进行加锁,会导致死锁。
- 该方法适用于读写不确定,并且只有一个读或写的场景。
2、互斥锁的使用
func main() {
var mutex sync.Mutex
wait := sync.WaitGroup{}
fmt.Println("锁定主协程")
mutex.Lock()
for i := 0; i < 5; i++ {
wait.Add(1)
go func(i int) {
fmt.Printf("%d 协程还未锁定:\n", i)
mutex.Lock()
fmt.Printf("%d 协程已锁定\n", i)
time.Sleep(time.Second)
fmt.Printf("%d 协程已解锁:\n", i)
mutex.Unlock()
defer wait.Done()
}(i)
}
time.Sleep(time.Second)
fmt.Println("解锁主协程")
mutex.Unlock()
wait.Wait()
}
输出结果为:
锁定主协程
0 协程还未锁定:
3 协程还未锁定:
2 协程还未锁定:
4 协程还未锁定:
1 协程还未锁定:
解锁主协程
0 协程已锁定
0 协程已解锁:
3 协程已锁定
3 协程已解锁:
2 协程已锁定
2 协程已解锁:
4 协程已锁定
4 协程已解锁:
1 协程已锁定
1 协程已解锁:
三、读写互斥锁
1、什么是读写互斥锁
(1)读写互斥锁的定义
读写互斥锁(sync.RWMutex)是一个控制goroutine访问的读写锁。该锁可以加多个读锁或写锁,其经常用于读次数远远多于写次数的场景。
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
(2)读写互斥锁的方法
有4个方法进行读写操作
- 写操作的Lock()和Unlock()方法
func (*RWMutex) Lock()
func (*RWMutex) Unlock()
对于写锁,如果在添加写锁之前已经有其他的读锁和写锁,则Lock()方法会阻塞,直到该写锁可写。写锁权限高于读锁,有写锁时有限进行写锁定。
- 读操作的RLock()和RUnlock()方法
func (*RWMutex) RLock()
func (*RWMutex) RUnlock()
如果已有写锁,则无法加载读锁。在只有读锁或者没有锁时,才可以加载读锁。读锁可以加载多个,所以适用于“读多写少”的场景。
读写互斥锁在读锁占用的情况下,会阻止写,但不阻止读。即多个goroutine可以同时获取读锁(读锁调用RLock()方法,而写锁调用Lock()方法),会阻止任何其他goroutine(无论读和写)进来,整个锁相当于该goroutine独占。
sync.RWMutex
用于读锁和写锁分开的情况
2、读写互斥锁使用
var count int
var rw sync.RWMutex
func ReadCount(n int, ch chan struct{}) {
rw.RLock()
fmt.Printf("goroutine %d 进入读操作...\n", n)
v := count
fmt.Printf("goroutine %d 读取结束,值为%d\n", n, v)
rw.RUnlock()
ch <- struct{}{}
}
func WriteCount(n int, ch chan struct{}) {
rw.Lock()
fmt.Printf("goroutine %d 进入写操作\n", n)
v := rand.New(rand.NewSource(time.Now().UnixNano()))
count = v.Intn(10)
fmt.Printf("goroutine %d 写入结束,新值为%d\n", n, count)
rw.Unlock()
ch <- struct{}{}
}
func main() {
ch := make(chan struct{}, 6)
for i := 0; i < 3; i++ {
go ReadCount(i, ch)
}
for i := 4; i <= 6; i++ {
go WriteCount(i, ch)
}
for i := 0; i < 6; i++ {
<-ch
}
}
多个读操作可以同时读一个数据,虽然加了锁,但读都是不受影响的,即“读和写都是互斥的,读和读不互斥”。多个读操作同时操作一个数据的实例如下:
var m sync.RWMutex
func Reading(i int) {
fmt.Println(i, "reading start")
m.RLock()
fmt.Println(i, "reading")
time.Sleep(time.Second)
m.RUnlock()
fmt.Println(i, "reading over")
}
func main() {
//m = new(sync.RWMutex)
go Reading(1)
go Reading(2)
time.Sleep(time.Second * 2)
}
可以看到协程1和协程2 同时开始读,不互斥
// 下面4行同时输出
2 reading start
1 reading start
1 reading
2 reading
2 reading over
1 reading over
由于读写互斥,所以在写开始后,读必须等写进行完才能继续。读写互斥锁示例如下:
var m sync.RWMutex
func Reading(i int) {
m.RLock()
fmt.Println(i, "reading start")
fmt.Println(i, "reading")
time.Sleep(time.Second)
fmt.Println(i, "reading over")
m.RUnlock()
}
func Writeing(i int) {
m.Lock()
fmt.Println(i, "writing start")
fmt.Println(i, "writing")
time.Sleep(time.Second)
fmt.Println(i, "writing over")
m.Unlock()
}
func main() {
//m = new(sync.RWMutex)
go Writeing(0)
go Reading(1)
go Writeing(2)
time.Sleep(time.Second * 5)
}
输出,可以看到在写完成后读才开始操作
0 writing start
0 writing
0 writing over
1 reading start
1 reading
1 reading over
2 writing start
2 writing
2 writing over
四、sync.Once结构体
1、sync.Once结构体定义
sync.Once
用于解决一次性初始化问题。它的作用与init()
函数类似,使方法只执行一次
sync.Once
和init()
也有所不同:init()
是在文件包首次被加载时才执行,且只执行一次;而sync.Once
结构体是在代码运行中有需要才执行,且只执行一次。
在很多高并发的场景中需要确保某些操作只执行一次’例如只加载一次配置文件、只关闭一次通道等。
type Once struct {
// done indicates whether the action has been performed.
// It is first in the struct because it is used in the hot path.
// The hot path is inlined at every call site.
// Placing done first allows more compact instructions on some architectures (amd64/386),
// and fewer instructions (to calculate offset) on other architectures.
done uint32
m Mutex
}
sync.Once
结构体包括一个互斥锁和一个布尔值。互斥锁保证布尔值和数据的安全,布尔值用来记录初始化是否完成。这样就能保证初始化操作时是并发安全的,并且初始化操作也不会被执行多次。
2、sync.Once的使用
sync.Once结构体只有一个Do()
方法
func main() {
//m = new(sync.RWMutex)
var once sync.Once
var flag = make(chan struct{})
onceBody := func() {
fmt.Println("这里只打印一次")
}
for i := 0; i < 6; i++ {
go func() {
once.Do(onceBody)
flag <- struct{}{}
}()
}
time.Sleep(time.Second * 2)
}
下面通过一个关闭通道例子可以看到。调用close()
方法来关闭通道,但如果关闭一个已经关闭的通道,则会使程序宕掉,可以借助sync.Once.Do()
方法来保证通道在运行的过程中只被关闭一次。
var wg sync.WaitGroup
var once sync.Once
// 只写通道
func func1(ch1 chan<- int) {
defer wg.Done()
for i := 0; i < 10; i++ {
ch1 <- i
}
close(ch1)
}
func func2(ch1 <-chan int, ch2 chan<- int) {
defer wg.Done()
for {
x, ok := <-ch1
if !ok {
break
}
ch2 <- 2 * x
}
once.Do(func() { close(ch2) })
}
func main() {
ch1 := make(chan int, 10)
ch2 := make(chan int, 10)
wg.Add(3)
go func1(ch1)
go func2(ch1, ch2)
go func2(ch1, ch2)
wg.Wait()
for ret := range ch2 {
fmt.Println(ret)
}
}
// 输出结果
2
10
12
14
16
18
0
4
6
8
代码解析:
- 声明一个
sync.WaitGroup
变量wg
,用于等待所有协程完成。 - 声明一个
sync.Once
变量once
,用于确保在关闭ch2
通道时只执行一次操作。 - 定义
func1
函数,该函数接受一个只写通道ch1
。在该函数内部,一个循环将整数从 0 到 9 写入通道ch1
,然后关闭ch1
通道,表示写入操作已完成。defer wg.Done()
在函数结束时标记协程已完成。 - 定义
func2
函数,该函数接受一个只读通道ch1
和一个只写通道ch2
。在该函数内部,一个无限循环从ch1
读取数据并将其乘以 2 后写入ch2
。当ch1
关闭后,循环会退出,然后once.Do
保证关闭ch2
通道只执行一次。defer wg.Done()
在函数结束时标记协程已完成。 - 在
main
函数中,创建两个通道ch1
和ch2
,分别用于协程之间的通信。 - 通过
wg.Add(3)
增加等待组的计数器,因为后续会启动 3 个协程。 - 启动一个协程运行
func1(ch1)
,这个协程会将 0 到 9 的整数写入ch1
,然后关闭通道。 - 启动两个协程运行
func2(ch1, ch2)
,这两个协程会并发地从ch1
中读取数据,将数据乘以 2 后写入ch2
。 wg.Wait()
阻塞主协程,等待所有的协程(包括func1
和两个func2
)都完成后继续执行。- 在主协程中,使用
range ch2
循环从ch2
通道中读取数据,直到ch2
通道被关闭。这时,从ch2
中读取的数据是func2
处理后的结果,然后通过fmt.Println(ret)
打印输出。
总的来说,这段代码通过并发地使用多个协程,实现了将 0 到 9 的整数通过通道传递给 func2
处理,并将处理后的结果输出到控制台。两个 func2
协程可以并发地从 ch1
中读取数据,但由于通道的关闭,每个协程在读取完数据后会退出。主协程等待所有协程完成后,循环从 ch2
中读取并打印处理结果。同时,once.Do
确保通道 ch2
仅被关闭一次。
上面代码中,开启了两个goroutine 去执行func2 函数,当func2函数执行完后,会掉用close方法关闭参数所指的ch2通道。为了防止多个goroutine同时关闭同一个通道而产生错误,可以调用sync.Once.Do()
方法来关闭通道,这样就不会产生多次关闭通道而使得程序崩溃的错误。
五、同步等待组sync.WaitGroup
1、同步等待组sync.WaitGroup简介
sync.WaitGroup
是一个结构体对象,用于等待一组线程的结束
sync.WaitGroup
结构体对象只有3个方法:Add()
、Done()
、Wait()
- Add():向内部计数器加上delta,delta可以是负数。如果内部计数器为0,则Wait()方法会将处于阻塞等待的所有goroutine释放。如果计数器小于0,则调用panic函数。Add()方法加上正数的调用应在
Wait()
方法之前,否则Wait()
方法可能只会等待很少的goroutine。 - Done()方法:会减少WaitGroup计数器的值,一般在goroutine的最后执行。
- Wait()方法:会阻塞,直到WaitGroup计数器减为0
在以上3个方法中Done()
是Add(-1)
方法的别名。简单来说’使用Add()
方法添加计数; 使用Done()
方法减掉一个计数’如果计数不为0则会阻塞Wait()
方法的运行。一个goroutine调用方法Add()
来设定应等待的goroutine的数量。每个被等待的goroutine在结束时应该调用Done()
方法。同时,在主goroutine里可以调用Wait()
方法阻塞至所有goroutine结束
2、同步等待组sync.WaitGroup使用示例
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("1 goroutine sleep")
time.Sleep(time.Second * 2)
fmt.Println("1 goroutine exit")
}()
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("2 goroutine sleep")
time.Sleep(time.Second * 4)
fmt.Println("2 goroutine exit")
}()
fmt.Println("waiting for all goroutine")
wg.Wait()
fmt.Println("All goroutine finished")
}
// 输出
waiting for all goroutine
2 goroutine sleep
1 goroutine sleep
1 goroutine exit
2 goroutine exit
All goroutine finished
Add()
和Done()
方法的使用一定要配对,否则可能发生死锁。所报的错误信息如下
waiting for all goroutine
2 goroutine sleep
1 goroutine sleep
1 goroutine exit
2 goroutine exit
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x0?)
/usr/local/go/src/runtime/sema.go:62 +0x28
sync.(*WaitGroup).Wait(0x140000a4010)
/usr/local/go/src/sync/waitgroup.go:139 +0x80
main.main()
/Users/dujie/GolangProjects/mysql-csv/main.go:172 +0x110