Channel通道
Channel通道
通道的定义
通道(channel)是用来传递数据的一个数据结构。go语言提倡使用通信来代替共享内存。当一个资源需要在goroutine之间共享时,通道在goroutine之间架起了一个管道,并提供了确保同步交换数据的机制。
在声明通道时,需要指定要被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或指针。
Go语言中的通道在任何时候,同时只能有一个goroutine访问通到进行发送和接受数据。
Channel底层是一个先进先出的环形队列(固定大小环形数组实现)
- full或empty就会阻塞
- send发送
- recv接受并移除
- sendx表示最后一次插入的元素的index
- recvx表示最后一次接受元素的index
- 发送、接受的操作符号都是<-
通道构造
源码runtime/chan.go/makechan
nil通道
var c1 chan int
fmt.Printf("c1: %d, %d, %v\n", len(c1), cap(c1), c1) // c1: 0, 0, <nil>
c1 <- 111 // 阻塞,不报错。由于没有初始化容器,111塞不进去
<- c1 // 也阻塞,不报错,什么都拿不出来
nil通道:chan零值是nil,即可以理解为未被初始化通道这个容器。nil通道可以认为是一个只要操作就阻塞当前协程的容器。这种通道不要创建和使用,阻塞后无法解除,底层源码中写明了无法解除。
package main
import "fmt"
func main() {
var c1 chan int
fmt.Printf("c1: %d, %d, %v\n", len(c1), cap(c1), c1)
fmt.Println("准备发送数据111")
c1 <- 111 // 往c1里面发送,阻塞在这一句,无法解除,显示本程序死锁
fmt.Println("发送数据111结束")
}
上例就算开新的协程来读取c1也没用,对nil通道读写都会阻塞。
非缓冲通道
var c2 = make(chan int, 0)
fmt.Printf("c2 : %d %d %v\n", len(c2), cap(c2), c2)
var c3 = make(chan int)
fmt.Printf("c3 : %d %d %v\n", len(c3), cap(c3), c3)
go func() {
c2 <- 300
}()
//fmt.Println(<-c2)
a := <-c2 // 这里读取会阻塞,因为通道内没有任何数据
fmt.Println(a)
说明:容量为0的通道,也叫同步通道。这中通道发送第一个元素时,如果没有接受操作就立即阻塞,直到被接受。同样接受时,如果没有数据被发送就立即阻塞,直到通道内有数据发送
缓冲通道
通道可以设置缓冲区——通过make()
函数的第2个参数指定缓冲区大小
ch := make(chan int,66)
带缓冲区的通道,允许发送方的数据发送和接收端的数据获取处于异步状态。就是说发送方发送的数据可以放在缓冲区中,等待接收端去接受数据,而不是立即需要接收端去接受数据。
不过由于缓冲区的大小是有限的,所以还是必须有接收端来接受数据的,否则缓冲区一满,数据发送方就无法再 发送数据了。
如果通道不带缓冲,则发送方会阻塞,直到接收方从通道中接受了数据。如果通道带缓冲,则发送方会阻塞,直到发送的值被复制到缓冲区中;如果缓冲区已满,则意味着需要的等待直到某个接受方接受了数据。接收方在有值可以接受之前,会一直阻塞
func main() {
ch := make(chan int, 3)
// 因为ch是带缓冲的通道,所以可以同时发送多个数据,而不用立刻去同步接受数据
ch <- 6
ch <- 7
ch <- 8
fmt.Println(<-ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
}
c4 := make(chan int, 8) // 缓冲通道,容量为8,长度为0
fmt.Printf("c4 %d %d %v\n", len(c4), cap(c4), c4)
c4 <- 111
c4 <- 222
fmt.Printf("c4 %d %d %v\n", len(c4), cap(c4), c4)
<-c4
t := <-c4
fmt.Printf("c4 %d %d %v\n", len(c4), cap(c4), c4) // 拿出来两个之后len变成0
fmt.Printf("%T %[1]v", t)
// 输出
c4 0 8 0x1400017a000
c4 2 8 0x1400017a000
c4 0 8 0x1400017a000
int 222
说明:容量不为0的通道。通道已满,再往该通道发送数据的操作会被阻塞;通道为空,再从该通道接受数据的操作会被阻塞
func main() {
ch := make(chan string) // 构建一个通道
go func() {
fmt.Println("开始goroutine")
ch <- "signal"
fmt.Println("退出goroutine")
}()
fmt.Println("等待goroutine")
<-ch // 这里会阻塞,直到往通道里写入数据
fmt.Println("完成")
}
// 输出
等待goroutine
开始goroutine
退出goroutine
完成
单向通道
<- chan type
这种定义表示只从一个channel里面拿,说明这是只读的chan <- type
这种定义标识只往一个channel里面写,说明这是只写的。
func produce(ch chan<- int) { // 生产者,只写,只要该通道具有写的能力就行
for {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
ch <- r.Intn(10)
time.Sleep(time.Second * 1)
}
}
func consume(ch <-chan int, wg *sync.WaitGroup) { // 消费者,只读。只要该通道有读的能力就行
for {
t := <-ch
fmt.Printf("正在消费%d\n", t)
if t == 9 {
wg.Done()
fmt.Println("消费完成")
}
}
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
c := make(chan int) // 创建可读/写非缓冲通道
go produce(c)
go consume(c, &wg)
wg.Wait()
}
通道关闭
- 使用
close(ch)
关闭一个通道 - 只有发送方才能关闭通道,一旦通道关闭,发送者不能再往其中发送数据,否则panic
- 通道关闭作用:告诉接受者再无新数据可以到达了
- 通道关闭
t,ok := <-ch
或t :=<-ch
从通道中读取数据- 正在阻塞等待通道中的数据的接受者,由于通道被关闭,接受者不再阻塞,获取数据失败ok位false,返回零值
- 接受者依然可以访问关闭的通道而不阻塞
- 如果通道内还有剩余数据,ok为true,接受数据
- 如果通道内剩余的数据被拿完了,继续接受不阻塞,ok为false,返回零值
- 已经关闭的通道,若再次关闭则panic,因此不要重复关闭
func produce(ch chan<- int, wg *sync.WaitGroup) { // 生产者,只写,只要该通道具有写的能力就行
for {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
a := r.Intn(10)
if a == 4 {
wg.Done()
close(ch)
return
} else {
ch <- a
time.Sleep(time.Second * 1)
}
}
}
func consume(ch <-chan int, wg *sync.WaitGroup) { // 消费者,只读。只要该通道有读的能力就行
for {
t, ok := <-ch
if !ok {
fmt.Println("生产者遇到瓶颈,已关闭")
return
} else {
fmt.Printf("正在消费%d\n", t)
}
}
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
c := make(chan int)
go produce(c, &wg)
go consume(c, &wg)
wg.Wait()
}
通道遍历
1、nil通道
发送、接受、遍历都阻塞
2、缓冲的、未关闭的通道
相当于一个无限元素的通道,迭代不完,阻塞在等下一个元素到达。
for data := range ch {
}
通道是可以被遍历的,遍历的结果就是接收到的数据,数据类型的就是通道的数据类型。通过for遍历获得变量只有一个,就是上面例子的data
func main() {
ch := make(chan int)
go func() {
for i := 6; i <= 8; i++ {
ch <- i
time.Sleep(time.Second)
}
}()
for receive := range ch {
fmt.Println(receive)
if receive == 8 {
break
}
}
}
通道可用于在两个goroutine之间通过传递一个指定类型的值来同步运行和通信。操作符 <-
用于指定通道的方向、发送和接受。如果未指定方向,则为双向通道
ch <- v
v := <-ch
默认情况,通道是不带缓冲区的。在发送方发送数据的同时必须有接收方相应的接受数据。
func Sum(s []int, ch chan int) {
sum := 0
fmt.Println(s)
for _, v := range s {
sum += v
}
fmt.Println(sum)
ch <- sum
}
func main() {
s := []int{6, 7, 8, -9, 1, 8}
ch := make(chan int)
go Sum(s[:len(s)/2], ch)
go Sum(s[len(s)/2:], ch)
a, b := <-ch, <-ch
fmt.Println(a, b, a+b)
}
// 输出结果为
[-9 1 8]
0
[6 7 8]
21
0 21 21
定时器
time.NewTicker
每xx(时间)秒调用一次
func main() {
t := time.NewTicker(2 * time.Second)
for {
fmt.Println(<-t.C)
}
}
time.NewTimer
xx秒后调用
func main() {
t := time.NewTimer(2 * time.Second)
for {
fmt.Println(<-t.C) // 通道阻塞2秒后只能接受一次
}
}
通道死锁
channel满了,就阻塞写;channel空了,就阻塞读。容量为0的通道可以理解为0个元素就满了
阻塞了当前协程之后就会交出cpu,去执行其他协程,希望其他协程帮助自己解除阻塞。
main函数结束了,整个进程就结束了
如果在main协程中,执行语句阻塞时,如果没有其他字写成可以执行,就剩主协程自己了,无法解锁,就自己把自己杀掉,报错fatal error deadlock
package main
import (
"fmt"
)
func main() {
c1 := make(chan int) // 非缓冲,未关闭通道
fmt.Printf("c1: %d, %d, %v\n", len(c1), cap(c1), c1)
c1 <- 111 // 当前协程阻塞,无人能解,死锁
}
运行结果如下
$ go run main.go
c1: 0, 0, 0xc00001a120
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
O:/pros/main.go:10 +0xea
exit status 2
如果通道阻塞不在main协程中发生,而是发生在子协程中,子协程会继续阻塞着,也可能发生死锁。但是由于至少main协程是一个值得等待的希望,编译器不能帮你识别出死锁。如果真的无任何协程帮助该协程解除阻塞状态,那么事实上该子协程解锁无望,已经死锁了。
死锁的危害可能会导致进程活着,但实际上某些协程未真正工作而阻塞,应该有良好的编码习惯,来减少死锁的出现。
struct{}型通道
如果一个结构体类型就是struct{} ,说明该结构体的实例没有数据成员,也就是实例内存占用为0
这种类型构成的通道,非常借阅内存,仅仅只是为了传递一个信号标志
func main() {
flag := make(chan struct{})
go func() {
time.Sleep(time.Second * 3)
flag <- struct{}{}
}()
fmt.Printf("等到信号了%T %[1]v\n", <-flag)
}
select 多路复用
在Unix中,select() 函数用来监控一组描述符,该机制常用语实现高并发的Socket服务器程序。go语言直接在语言级别支持select关键字,用于处理异步I/O问题
select {
case <- ch1:
// 如果ch1通道发送成功,则该case会接收到数据
case ch2 <- 1:
// 如果ch2接受数据成功,则该case会收到数据
default:
// 默认分支
}
select 默认是阻塞的,只有当监听的通道中有发送或接受可以进行时才会运行。当多个通道都准备好后,select会随机的选择一个操作(发送或接受)来执行。
Go语言没有对通道提供直接的超时处理机制,但可以利用select来间接实现:
func main() {
var wh sync.WaitGroup
wh.Add(1)
ch := make(chan int)
timeout := make(chan struct{})
go func() {
time.Sleep(time.Second * 5)
ch <- 111
}()
go func() {
time.Sleep(time.Second * 4)
timeout <- struct{}{}
}()
go func() {
select {
case <-timeout:
fmt.Println("已超时,无法收到消息")
wh.Done()
case s := <-ch:
fmt.Println("成功收到消息:", s)
close(timeout)
close(ch)
wh.Done()
}
}()
wh.Wait()
}
func main() {
count := make(chan int, 4)
flag := make(chan struct{})
go func() {
defer func() { flag <- struct{}{} }()
for i := 0; i < 10; i++ {
count <- i
time.Sleep(time.Second)
}
}()
for {
select {
case n := <-count:
fmt.Println("count= ", n)
case <-flag:
fmt.Println("结束了")
goto END
}
}
END:
fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
}
func main() {
count := make(chan int, 4)
flag := make(chan struct{})
newBase := 1000
t1 := time.NewTicker(time.Second)
t2 := time.NewTicker(5 * time.Second)
go func() {
defer func() { flag <- struct{}{} }()
for i := 0; i < 5; i++ {
count <- i
}
}()
time.Sleep(1 * time.Second)
fmt.Println(len(count), "~~~~@@@")
for {
select {
case <-t1.C:
fmt.Println("每一秒看看长度", len(count))
case <-t2.C:
fmt.Println("每隔5秒取一次", <-count)
case count <- newBase: // 发送数据成功进入通道执行该case
newBase++
}
}
fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
}
通道并发
Go语言采用并发同步模型叫做Communication Sequential Process 通讯顺序进程,这是一种消息传递模型,在goroutine间传递消息,而不是对数据进行加锁来实现同步访问。在goroutine之间使用channel来同步和传递数据。
- 多个协程之间通讯的管道
- 一端推入数据,一端拿走数据
- 同一时间,只有一个协程可以访问通道的数据
- 协调协程的执行顺序
如果多个线程都使用了同一个数据,就会出现竞争问题。因为线程的切换不会听从程序员的意志,时间片用完就切换了。解决办法往往需要加锁,让其他线程不能对共享数据进行修改,从而保证逻辑正确。
但锁的引入严重影响并行效率。
需求:
1、有一个全局数count,初始为0。编写一个函数inc,能够对count增加10万次。执行5次inc函数,请问最终count值是多少?
var count = 0
func inc() {
for i := 0; i < 100000; i++ {
count++
}
}
func main() {
start := time.Now()
inc()
inc()
inc()
inc()
inc()
fmt.Println("Go协程数:", runtime.NumGoroutine())
fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
fmt.Println("执行时长:", time.Since(start).Microseconds())
fmt.Println("count=", count)
}
// 输出
Go协程数: 1
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
执行时长: 1531
count= 500000
这是串行,一条一条执行,没有并发
2、如果并发执行inc函数,该怎么做呢,请问最终count值是多少?
var count = 0
var sw sync.WaitGroup
func inc() {
defer sw.Done()
for i := 0; i < 100000; i++ {
count++
}
}
func main() {
start := time.Now()
sw.Add(5)
for i := 0; i < 5; i++ {
go func() {
inc()
}()
}
fmt.Println("Go协程数:", runtime.NumGoroutine())
fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
sw.Wait()
fmt.Println("执行时长:", time.Since(start).Microseconds())
fmt.Println("count=", count)
}
// 输出
Go协程数: 6
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
执行时长: 559
count= 159255
开了5个协程并发,count结果不为50万了。为什么?count随机了吗?
在上面代码中设置runtime.GOMAXPROCS(1)
,输出会怎么样?
runtime.GOMAXPROCS(1)
表示只使用一个CPU核心来并发执行go程序,设置为1会降低并发性能,大多数情况,go会自动管理并发的核心数,获得最佳的性能,最好不要手动设置该参数。
runtime.GOMAXPROCS(1)
原因在于count++不是原子操作,会被打断。所以,即使使用goroutine也会有竞争,一样会有并发安全问题。换成下句试一试
func inc() {
defer sw.Done()
for i := 0; i < 100000; i++ {
//count++
atomic.AddInt64(&count, 1)
}
}
// 输出
Go协程数: 6
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
执行时长: 25306
count= 500000
结果正确了,但是这种共享内存的方式执行时长明显增加。
也可以使用互斥锁来保证count++的原子性操作
var count int64 = 0
var sw sync.WaitGroup
var lock sync.Mutex
func inc() {
defer sw.Done()
for i := 0; i < 100000; i++ {
//count++
lock.Lock()
count++
lock.Unlock()
}
}
3、能否使用通道,来同步多个协程
var count int64 = 0
var sw sync.WaitGroup
var lock sync.Mutex
var ch = make(chan int, 1)
func inc() {
defer sw.Done()
for i := 0; i < 100000; i++ {
//count++
t := <-ch
t++
ch <- t
}
}
func main() {
start := time.Now()
ch <- 0
sw.Add(5)
for i := 0; i < 5; i++ {
go func() {
inc()
}()
}
fmt.Println("Go协程数:", runtime.NumGoroutine())
fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
sw.Wait()
fmt.Println("执行时长:", time.Since(start).Microseconds())
fmt.Println("count=", <-ch)
}
// 输出
Go协程数: 6
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
执行时长: 78612
count= 500000
上例是计算密集型,对同一个数据进行争抢,不是能发挥并行计算优势的例子,也不适合通道,用锁实现更有效率,更有优势。
通道适合数据流动的场景
- 如同管道一样,一级一级处理,一个协程处理完,发给其他协程
- 生产者、消费者模型
协程泄露
原因:
- 协程阻塞,未能如期结束,之后就会有大量累积
- 协程阻塞最常见的原因都跟通道有关
- 由于每个协程都要占用内存,所以携程泄露也会导致内存泄露
因此,如果你不知道你创建的协程何时能够结束,就不要使用它。否则可能协程泄露。