Sync包实现并发

一、静态

Go语言以构建高并发容易、性能优异而文明,但是随着并发的使用,可能发生数据争用的静态问题。由于不知道什么时候发生,所以讲产生难以发现和调试的错误。

例子:

func getNumber() int {
    var i int
    go func() {
        i = 6
    }()
    return i
}
func main() {
    fmt.Println(getNumber())
}

上例,getNumber() 函数先声明一个变量i,之后在goroutine中单独对i进行设置。而这时程序也正在从函数中返回i,由于不知道goroutine是否已经完成对i值的修改,所以将有两种操作发生:

  1. 如果goroutine已完成对i值的修改,则最后返回的i值为6;
  2. 如果goroutine未完成对i值的修改,则变量i的值从函数返回,为默认值0

现在根据这两个操作中的哪一个先完成,输出的记过将是0(默认整数值)或6。这就是为什么将其称为数据竟态:从getNumber() 函数返回的值会根据1、或2哪个操作先完成而得名。

解决方法:通道阻塞、互斥锁等。

二、互斥锁

1、什么是互斥锁

(1)sync.Mutex的定义

sync.Mutex 是一个结构体对象,用于实现互斥锁,适用于读写不确定的场景(即读写次数没有明显区别,并且只允许有一个写或读的场景)。所以该锁也称为“全局锁”。

sync.Mutex 结构体由两个字段state和sema组成,其中state表示当前互斥锁的状态,而sema用于控制锁状态的信号量。

type Mutex struct {
	state int32
	sema  uint32
}

(2)sync.Mutex的方法

sync.Mutex 结构体对象有Lock()Unlock() 两个方法。Lock() 方法用于加锁,Unlock() 方法用于解锁。

在使用Lock() 加锁后,就不能再次对其进行加速(如果再次加锁,会造成死锁)。直到利用Unlock() 对其解锁才能再次加锁。

在用Unlock() 解锁Mutex时,如果未加锁,会导致运行时错误

Lock()和Unlock() 使用注意:

  • 在一个goroutine获得Mutex后,其他goroutine只能等到这个goroutine释放该Mutex
  • 在使用Lock()方法加锁后,不能再继续对其加锁,直到利用Unlock()解锁才能再次加锁
  • 在Lock()方法之前使用Unlock() 会导致panic异常
  • 已经锁定的Mutex并不与特定的goroutine关联,可以利用一个goroutine对其加锁,再利用其他goroutine对其解锁。
  • 在同一个goroutine中的Mutex被解锁前再次进行加锁,会导致死锁。
  • 该方法适用于读写不确定,并且只有一个读或写的场景。
2、互斥锁的使用
func main() {
    var mutex sync.Mutex
    wait := sync.WaitGroup{}
    fmt.Println("锁定主协程")
    mutex.Lock()
    for i := 0; i < 5; i++ {
        wait.Add(1)
        go func(i int) {
            fmt.Printf("%d 协程还未锁定:\n", i)
            mutex.Lock()
            fmt.Printf("%d 协程已锁定\n", i)
            time.Sleep(time.Second)
            fmt.Printf("%d 协程已解锁:\n", i)
            mutex.Unlock()
            defer wait.Done()
        }(i)
    }
    time.Sleep(time.Second)
    fmt.Println("解锁主协程")
    mutex.Unlock()
    wait.Wait()

}

输出结果为:

锁定主协程
0 协程还未锁定:
3 协程还未锁定:
2 协程还未锁定:
4 协程还未锁定:
1 协程还未锁定:
解锁主协程
0 协程已锁定
0 协程已解锁:
3 协程已锁定
3 协程已解锁:
2 协程已锁定
2 协程已解锁:
4 协程已锁定
4 协程已解锁:
1 协程已锁定
1 协程已解锁:

三、读写互斥锁

1、什么是读写互斥锁

(1)读写互斥锁的定义

读写互斥锁(sync.RWMutex)是一个控制goroutine访问的读写锁。该锁可以加多个读锁或写锁,其经常用于读次数远远多于写次数的场景。

type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers
	readerSem   uint32 // semaphore for readers to wait for completing writers
	readerCount int32  // number of pending readers
	readerWait  int32  // number of departing readers
}

(2)读写互斥锁的方法

有4个方法进行读写操作

  • 写操作的Lock()和Unlock()方法

func (*RWMutex) Lock()

func (*RWMutex) Unlock()

对于写锁,如果在添加写锁之前已经有其他的读锁和写锁,则Lock()方法会阻塞,直到该写锁可写。写锁权限高于读锁,有写锁时有限进行写锁定。

  • 读操作的RLock()和RUnlock()方法

func (*RWMutex) RLock()

func (*RWMutex) RUnlock()

如果已有写锁,则无法加载读锁。在只有读锁或者没有锁时,才可以加载读锁。读锁可以加载多个,所以适用于“读多写少”的场景。

读写互斥锁在读锁占用的情况下,会阻止写,但不阻止读。即多个goroutine可以同时获取读锁(读锁调用RLock()方法,而写锁调用Lock()方法),会阻止任何其他goroutine(无论读和写)进来,整个锁相当于该goroutine独占。

sync.RWMutex 用于读锁和写锁分开的情况

2、读写互斥锁使用
var count int
var rw sync.RWMutex

func ReadCount(n int, ch chan struct{}) {
    rw.RLock()
    fmt.Printf("goroutine %d 进入读操作...\n", n)
    v := count
    fmt.Printf("goroutine %d 读取结束,值为%d\n", n, v)
    rw.RUnlock()
    ch <- struct{}{}
}
func WriteCount(n int, ch chan struct{}) {
    rw.Lock()
    fmt.Printf("goroutine %d 进入写操作\n", n)
    v := rand.New(rand.NewSource(time.Now().UnixNano()))
    count = v.Intn(10)
    fmt.Printf("goroutine %d 写入结束,新值为%d\n", n, count)
    rw.Unlock()
    ch <- struct{}{}

}
func main() {
    ch := make(chan struct{}, 6)
    for i := 0; i < 3; i++ {
        go ReadCount(i, ch)

    }
    for i := 4; i <= 6; i++ {
        go WriteCount(i, ch)
    }
    for i := 0; i < 6; i++ {
        <-ch
    }

}

多个读操作可以同时读一个数据,虽然加了锁,但读都是不受影响的,即“读和写都是互斥的,读和读不互斥”。多个读操作同时操作一个数据的实例如下:

var m sync.RWMutex

func Reading(i int) {
    fmt.Println(i, "reading start")
    m.RLock()
    fmt.Println(i, "reading")
    time.Sleep(time.Second)
    m.RUnlock()
    fmt.Println(i, "reading over")
}
func main() {
    //m = new(sync.RWMutex)
    go Reading(1)
    go Reading(2)
    time.Sleep(time.Second * 2)
}

可以看到协程1和协程2 同时开始读,不互斥

// 下面4行同时输出
2 reading start
1 reading start
1 reading
2 reading

2 reading over
1 reading over

由于读写互斥,所以在写开始后,读必须等写进行完才能继续。读写互斥锁示例如下:

var m sync.RWMutex

func Reading(i int) {
    m.RLock()

    fmt.Println(i, "reading start")
    fmt.Println(i, "reading")
    time.Sleep(time.Second)
    fmt.Println(i, "reading over")
    m.RUnlock()

}
func Writeing(i int) {
    m.Lock()
    fmt.Println(i, "writing start")
    fmt.Println(i, "writing")
    time.Sleep(time.Second)
    fmt.Println(i, "writing over")
    m.Unlock()

}
func main() {
    //m = new(sync.RWMutex)
    go Writeing(0)
    go Reading(1)
    go Writeing(2)
    time.Sleep(time.Second * 5)
}

输出,可以看到在写完成后读才开始操作

0 writing start
0 writing
0 writing over
1 reading start
1 reading
1 reading over
2 writing start
2 writing
2 writing over

四、sync.Once结构体

1、sync.Once结构体定义

sync.Once 用于解决一次性初始化问题。它的作用与init()函数类似,使方法只执行一次

sync.Onceinit() 也有所不同:init() 是在文件包首次被加载时才执行,且只执行一次;而sync.Once 结构体是在代码运行中有需要才执行,且只执行一次。

在很多高并发的场景中需要确保某些操作只执行一次’例如只加载一次配置文件、只关闭一次通道等。

type Once struct {
	// done indicates whether the action has been performed.
	// It is first in the struct because it is used in the hot path.
	// The hot path is inlined at every call site.
	// Placing done first allows more compact instructions on some architectures (amd64/386),
	// and fewer instructions (to calculate offset) on other architectures.
	done uint32
	m    Mutex
}

sync.Once 结构体包括一个互斥锁和一个布尔值。互斥锁保证布尔值和数据的安全,布尔值用来记录初始化是否完成。这样就能保证初始化操作时是并发安全的,并且初始化操作也不会被执行多次。

2、sync.Once的使用

sync.Once结构体只有一个Do()方法

func main() {
    //m = new(sync.RWMutex)
    var once sync.Once
    var flag = make(chan struct{})
    onceBody := func() {
        fmt.Println("这里只打印一次")
    }
    for i := 0; i < 6; i++ {
        go func() {
            once.Do(onceBody)
            flag <- struct{}{}
        }()
    }
    time.Sleep(time.Second * 2)
}

下面通过一个关闭通道例子可以看到。调用close()方法来关闭通道,但如果关闭一个已经关闭的通道,则会使程序宕掉,可以借助sync.Once.Do() 方法来保证通道在运行的过程中只被关闭一次。

var wg sync.WaitGroup
var once sync.Once

// 只写通道
func func1(ch1 chan<- int) {
    defer wg.Done()
    for i := 0; i < 10; i++ {
        ch1 <- i
    }
    close(ch1)
}
func func2(ch1 <-chan int, ch2 chan<- int) {
    defer wg.Done()
    for {
        x, ok := <-ch1
        if !ok {
            break
        }
        ch2 <- 2 * x
    }
    once.Do(func() { close(ch2) })
}
func main() {
    ch1 := make(chan int, 10)
    ch2 := make(chan int, 10)
    wg.Add(3)
    go func1(ch1)
    go func2(ch1, ch2)
    go func2(ch1, ch2)
    wg.Wait()
    for ret := range ch2 {
        fmt.Println(ret)
    }
}
// 输出结果
2
10
12
14
16
18
0
4
6
8

代码解析:

  1. 声明一个 sync.WaitGroup 变量 wg,用于等待所有协程完成。
  2. 声明一个 sync.Once 变量 once,用于确保在关闭 ch2 通道时只执行一次操作。
  3. 定义 func1 函数,该函数接受一个只写通道 ch1。在该函数内部,一个循环将整数从 0 到 9 写入通道 ch1,然后关闭 ch1 通道,表示写入操作已完成。defer wg.Done() 在函数结束时标记协程已完成。
  4. 定义 func2 函数,该函数接受一个只读通道 ch1 和一个只写通道 ch2。在该函数内部,一个无限循环从 ch1 读取数据并将其乘以 2 后写入 ch2。当 ch1 关闭后,循环会退出,然后 once.Do 保证关闭 ch2 通道只执行一次。defer wg.Done() 在函数结束时标记协程已完成。
  5. main 函数中,创建两个通道 ch1ch2,分别用于协程之间的通信。
  6. 通过 wg.Add(3) 增加等待组的计数器,因为后续会启动 3 个协程。
  7. 启动一个协程运行 func1(ch1),这个协程会将 0 到 9 的整数写入 ch1,然后关闭通道。
  8. 启动两个协程运行 func2(ch1, ch2),这两个协程会并发地从 ch1 中读取数据,将数据乘以 2 后写入 ch2
  9. wg.Wait() 阻塞主协程,等待所有的协程(包括 func1 和两个 func2)都完成后继续执行。
  10. 在主协程中,使用 range ch2 循环从 ch2 通道中读取数据,直到 ch2 通道被关闭。这时,从 ch2 中读取的数据是 func2 处理后的结果,然后通过 fmt.Println(ret) 打印输出。

总的来说,这段代码通过并发地使用多个协程,实现了将 0 到 9 的整数通过通道传递给 func2 处理,并将处理后的结果输出到控制台。两个 func2 协程可以并发地从 ch1 中读取数据,但由于通道的关闭,每个协程在读取完数据后会退出。主协程等待所有协程完成后,循环从 ch2 中读取并打印处理结果。同时,once.Do 确保通道 ch2 仅被关闭一次。

上面代码中,开启了两个goroutine 去执行func2 函数,当func2函数执行完后,会掉用close方法关闭参数所指的ch2通道。为了防止多个goroutine同时关闭同一个通道而产生错误,可以调用sync.Once.Do()方法来关闭通道,这样就不会产生多次关闭通道而使得程序崩溃的错误。

五、同步等待组sync.WaitGroup

1、同步等待组sync.WaitGroup简介

sync.WaitGroup 是一个结构体对象,用于等待一组线程的结束

sync.WaitGroup 结构体对象只有3个方法:Add()Done()Wait()

  • Add():向内部计数器加上delta,delta可以是负数。如果内部计数器为0,则Wait()方法会将处于阻塞等待的所有goroutine释放。如果计数器小于0,则调用panic函数。Add()方法加上正数的调用应在Wait()方法之前,否则Wait()方法可能只会等待很少的goroutine。
  • Done()方法:会减少WaitGroup计数器的值,一般在goroutine的最后执行。
  • Wait()方法:会阻塞,直到WaitGroup计数器减为0

在以上3个方法中Done()Add(-1)方法的别名。简单来说’使用Add()方法添加计数; 使用Done()方法减掉一个计数’如果计数不为0则会阻塞Wait()方法的运行。一个goroutine调用方法Add() 来设定应等待的goroutine的数量。每个被等待的goroutine在结束时应该调用Done()方法。同时,在主goroutine里可以调用Wait()方法阻塞至所有goroutine结束

2、同步等待组sync.WaitGroup使用示例
func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println("1 goroutine sleep")
        time.Sleep(time.Second * 2)
        fmt.Println("1 goroutine exit")
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println("2 goroutine sleep")
        time.Sleep(time.Second * 4)
        fmt.Println("2 goroutine exit")
    }()
    fmt.Println("waiting for all goroutine")
    wg.Wait()
    fmt.Println("All goroutine finished")
}
// 输出
waiting for all goroutine
2 goroutine sleep
1 goroutine sleep
1 goroutine exit
2 goroutine exit
All goroutine finished

Add()Done()方法的使用一定要配对,否则可能发生死锁。所报的错误信息如下

waiting for all goroutine
2 goroutine sleep
1 goroutine sleep
1 goroutine exit
2 goroutine exit
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x0?)
	/usr/local/go/src/runtime/sema.go:62 +0x28
sync.(*WaitGroup).Wait(0x140000a4010)
	/usr/local/go/src/sync/waitgroup.go:139 +0x80
main.main()
	/Users/dujie/GolangProjects/mysql-csv/main.go:172 +0x110