Channel通道

通道的定义

通道(channel)是用来传递数据的一个数据结构。go语言提倡使用通信来代替共享内存。当一个资源需要在goroutine之间共享时,通道在goroutine之间架起了一个管道,并提供了确保同步交换数据的机制。

在声明通道时,需要指定要被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或指针。

Go语言中的通道在任何时候,同时只能有一个goroutine访问通到进行发送和接受数据。

image

image

Channel底层是一个先进先出的环形队列(固定大小环形数组实现)

  • full或empty就会阻塞
  • send发送
  • recv接受并移除
  • sendx表示最后一次插入的元素的index
  • recvx表示最后一次接受元素的index
  • 发送、接受的操作符号都是<-

通道构造

源码runtime/chan.go/makechan

nil通道
var c1 chan int
fmt.Printf("c1: %d, %d, %v\n", len(c1), cap(c1), c1) // c1: 0, 0, <nil>
c1 <- 111 // 阻塞,不报错。由于没有初始化容器,111塞不进去
<- c1 // 也阻塞,不报错,什么都拿不出来

nil通道:chan零值是nil,即可以理解为未被初始化通道这个容器。nil通道可以认为是一个只要操作就阻塞当前协程的容器。这种通道不要创建和使用,阻塞后无法解除,底层源码中写明了无法解除。

package main
import "fmt"
func main() {
 var c1 chan int
 fmt.Printf("c1: %d, %d, %v\n", len(c1), cap(c1), c1)
 fmt.Println("准备发送数据111")
 c1 <- 111 // 往c1里面发送,阻塞在这一句,无法解除,显示本程序死锁
 fmt.Println("发送数据111结束")
}

上例就算开新的协程来读取c1也没用,对nil通道读写都会阻塞。

非缓冲通道
var c2 = make(chan int, 0)
fmt.Printf("c2 : %d %d %v\n", len(c2), cap(c2), c2)
var c3 = make(chan int)
fmt.Printf("c3 : %d %d %v\n", len(c3), cap(c3), c3)
go func() {
    c2 <- 300
}()
//fmt.Println(<-c2)
a := <-c2 // 这里读取会阻塞,因为通道内没有任何数据
fmt.Println(a)

说明:容量为0的通道,也叫同步通道。这中通道发送第一个元素时,如果没有接受操作就立即阻塞,直到被接受。同样接受时,如果没有数据被发送就立即阻塞,直到通道内有数据发送

缓冲通道

通道可以设置缓冲区——通过make()函数的第2个参数指定缓冲区大小

ch := make(chan int,66)

带缓冲区的通道,允许发送方的数据发送和接收端的数据获取处于异步状态。就是说发送方发送的数据可以放在缓冲区中,等待接收端去接受数据,而不是立即需要接收端去接受数据。

不过由于缓冲区的大小是有限的,所以还是必须有接收端来接受数据的,否则缓冲区一满,数据发送方就无法再 发送数据了。

如果通道不带缓冲,则发送方会阻塞,直到接收方从通道中接受了数据。如果通道带缓冲,则发送方会阻塞,直到发送的值被复制到缓冲区中;如果缓冲区已满,则意味着需要的等待直到某个接受方接受了数据。接收方在有值可以接受之前,会一直阻塞

func main() {
    ch := make(chan int, 3)
    // 因为ch是带缓冲的通道,所以可以同时发送多个数据,而不用立刻去同步接受数据
    ch <- 6
    ch <- 7
    ch <- 8
    fmt.Println(<-ch)
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}
    c4 := make(chan int, 8) // 缓冲通道,容量为8,长度为0
    fmt.Printf("c4 %d %d %v\n", len(c4), cap(c4), c4)
    c4 <- 111
    c4 <- 222
    fmt.Printf("c4 %d %d %v\n", len(c4), cap(c4), c4)
    <-c4
    t := <-c4
    fmt.Printf("c4 %d %d %v\n", len(c4), cap(c4), c4)  // 拿出来两个之后len变成0
    fmt.Printf("%T %[1]v", t)
// 输出
c4 0 8 0x1400017a000
c4 2 8 0x1400017a000
c4 0 8 0x1400017a000
int 222

说明:容量不为0的通道。通道已满,再往该通道发送数据的操作会被阻塞;通道为空,再从该通道接受数据的操作会被阻塞

func main() {
    ch := make(chan string) // 构建一个通道
    go func() {
        fmt.Println("开始goroutine")
        ch <- "signal"
        fmt.Println("退出goroutine")
    }()
    fmt.Println("等待goroutine")
    <-ch // 这里会阻塞,直到往通道里写入数据
    fmt.Println("完成")
}
// 输出
等待goroutine
开始goroutine
退出goroutine
完成

单向通道

  • <- chan type 这种定义表示只从一个channel里面拿,说明这是只读的
  • chan <- type 这种定义标识只往一个channel里面写,说明这是只写的。
func produce(ch chan<- int) { // 生产者,只写,只要该通道具有写的能力就行
    for {
        r := rand.New(rand.NewSource(time.Now().UnixNano()))
        ch <- r.Intn(10)
        time.Sleep(time.Second * 1)
    }
}
func consume(ch <-chan int, wg *sync.WaitGroup) { // 消费者,只读。只要该通道有读的能力就行
    for {
        t := <-ch
        fmt.Printf("正在消费%d\n", t)
        if t == 9 {
            wg.Done()
            fmt.Println("消费完成")
        }
    }
}
func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    c := make(chan int)  // 创建可读/写非缓冲通道
    go produce(c)
    go consume(c, &wg)
    wg.Wait()
}

通道关闭

  • 使用close(ch)关闭一个通道
  • 只有发送方才能关闭通道,一旦通道关闭,发送者不能再往其中发送数据,否则panic
  • 通道关闭作用:告诉接受者再无新数据可以到达了
  • 通道关闭
    • t,ok := <-cht :=<-ch 从通道中读取数据
    • 正在阻塞等待通道中的数据的接受者,由于通道被关闭,接受者不再阻塞,获取数据失败ok位false,返回零值
    • 接受者依然可以访问关闭的通道而不阻塞
      • 如果通道内还有剩余数据,ok为true,接受数据
      • 如果通道内剩余的数据被拿完了,继续接受不阻塞,ok为false,返回零值
  • 已经关闭的通道,若再次关闭则panic,因此不要重复关闭
func produce(ch chan<- int, wg *sync.WaitGroup) { // 生产者,只写,只要该通道具有写的能力就行
    for {
        r := rand.New(rand.NewSource(time.Now().UnixNano()))
        a := r.Intn(10)
        if a == 4 {
            wg.Done()
            close(ch)
            return
        } else {
            ch <- a
            time.Sleep(time.Second * 1)
        }
    }
}
func consume(ch <-chan int, wg *sync.WaitGroup) { // 消费者,只读。只要该通道有读的能力就行
    for {
        t, ok := <-ch
        if !ok {
            fmt.Println("生产者遇到瓶颈,已关闭")
            return
        } else {
            fmt.Printf("正在消费%d\n", t)

        }
    }
}
func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    c := make(chan int)
    go produce(c, &wg)
    go consume(c, &wg)
    wg.Wait()
}

通道遍历

1、nil通道

发送、接受、遍历都阻塞

2、缓冲的、未关闭的通道

相当于一个无限元素的通道,迭代不完,阻塞在等下一个元素到达。

for data := range ch {

}

通道是可以被遍历的,遍历的结果就是接收到的数据,数据类型的就是通道的数据类型。通过for遍历获得变量只有一个,就是上面例子的data

func main() {
    ch := make(chan int)
    go func() {
        for i := 6; i <= 8; i++ {
            ch <- i
            time.Sleep(time.Second)
        }
    }()
    for receive := range ch {
        fmt.Println(receive)
        if receive == 8 {
            break
        }
    }
}

通道可用于在两个goroutine之间通过传递一个指定类型的值来同步运行和通信。操作符 <- 用于指定通道的方向、发送和接受。如果未指定方向,则为双向通道

ch <- v 
v := <-ch

默认情况,通道是不带缓冲区的。在发送方发送数据的同时必须有接收方相应的接受数据。

func Sum(s []int, ch chan int) {
    sum := 0
    fmt.Println(s)
    for _, v := range s {
        sum += v
    }
    fmt.Println(sum)
    ch <- sum
}
func main() {
    s := []int{6, 7, 8, -9, 1, 8}
    ch := make(chan int)
    go Sum(s[:len(s)/2], ch)
    go Sum(s[len(s)/2:], ch)
    a, b := <-ch, <-ch
    fmt.Println(a, b, a+b)
}

// 输出结果为
[-9 1 8]
0
[6 7 8]
21
0 21 21

定时器

  • time.NewTicker 每xx(时间)秒调用一次
func main() {
    t := time.NewTicker(2 * time.Second)
    for {
        fmt.Println(<-t.C)
    }
}
  • time.NewTimer xx秒后调用
func main() {
 t := time.NewTimer(2 * time.Second)
 for {
 fmt.Println(<-t.C) // 通道阻塞2秒后只能接受一次
 }
}

通道死锁

channel满了,就阻塞写;channel空了,就阻塞读。容量为0的通道可以理解为0个元素就满了

阻塞了当前协程之后就会交出cpu,去执行其他协程,希望其他协程帮助自己解除阻塞。

main函数结束了,整个进程就结束了

如果在main协程中,执行语句阻塞时,如果没有其他字写成可以执行,就剩主协程自己了,无法解锁,就自己把自己杀掉,报错fatal error deadlock

package main
import (
 "fmt"
)
func main() {
 c1 := make(chan int) // 非缓冲,未关闭通道
 fmt.Printf("c1: %d, %d, %v\n", len(c1), cap(c1), c1)
 c1 <- 111 // 当前协程阻塞,无人能解,死锁
}
运行结果如下
$ go run main.go
c1: 0, 0, 0xc00001a120
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
        O:/pros/main.go:10 +0xea
exit status 2

如果通道阻塞不在main协程中发生,而是发生在子协程中,子协程会继续阻塞着,也可能发生死锁。但是由于至少main协程是一个值得等待的希望,编译器不能帮你识别出死锁。如果真的无任何协程帮助该协程解除阻塞状态,那么事实上该子协程解锁无望,已经死锁了。

死锁的危害可能会导致进程活着,但实际上某些协程未真正工作而阻塞,应该有良好的编码习惯,来减少死锁的出现。

struct{}型通道

如果一个结构体类型就是struct{} ,说明该结构体的实例没有数据成员,也就是实例内存占用为0

这种类型构成的通道,非常借阅内存,仅仅只是为了传递一个信号标志

func main() {
    flag := make(chan struct{})
    go func() {
        time.Sleep(time.Second * 3)
        flag <- struct{}{}
    }()
    fmt.Printf("等到信号了%T %[1]v\n", <-flag)
}

select 多路复用

在Unix中,select() 函数用来监控一组描述符,该机制常用语实现高并发的Socket服务器程序。go语言直接在语言级别支持select关键字,用于处理异步I/O问题

select {
case <- ch1:
    // 如果ch1通道发送成功,则该case会接收到数据 
case ch2 <- 1:
    // 如果ch2接受数据成功,则该case会收到数据
default:
    // 默认分支
    
}

select 默认是阻塞的,只有当监听的通道中有发送或接受可以进行时才会运行。当多个通道都准备好后,select会随机的选择一个操作(发送或接受)来执行。

Go语言没有对通道提供直接的超时处理机制,但可以利用select来间接实现:

func main() {
    var wh sync.WaitGroup
    wh.Add(1)
    ch := make(chan int)
    timeout := make(chan struct{})
    go func() {
        time.Sleep(time.Second * 5)
        ch <- 111
    }()
    go func() {
        time.Sleep(time.Second * 4)
        timeout <- struct{}{}
    }()
    go func() {
        select {
        case <-timeout:
            fmt.Println("已超时,无法收到消息")
            wh.Done()
        case s := <-ch:
            fmt.Println("成功收到消息:", s)
            close(timeout)
            close(ch)
            wh.Done()
        }
    }()
    wh.Wait()

}
func main() {
    count := make(chan int, 4)
    flag := make(chan struct{})
    go func() {
        defer func() { flag <- struct{}{} }()
        for i := 0; i < 10; i++ {
            count <- i
            time.Sleep(time.Second)
        }
    }()
    for {
        select {
        case n := <-count:
            fmt.Println("count= ", n)
        case <-flag:
            fmt.Println("结束了")
            goto END

        }
    }
END:
    fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")

}
func main() {
    count := make(chan int, 4)
    flag := make(chan struct{})
    newBase := 1000
    t1 := time.NewTicker(time.Second)
    t2 := time.NewTicker(5 * time.Second)
    go func() {
        defer func() { flag <- struct{}{} }()
        for i := 0; i < 5; i++ {
            count <- i
        }
    }()
    time.Sleep(1 * time.Second)
    fmt.Println(len(count), "~~~~@@@")
    for {
        select {
        case <-t1.C:
            fmt.Println("每一秒看看长度", len(count))
        case <-t2.C:
            fmt.Println("每隔5秒取一次", <-count)
        case count <- newBase: // 发送数据成功进入通道执行该case
            newBase++
        }
    }
    fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")

}

通道并发

Go语言采用并发同步模型叫做Communication Sequential Process 通讯顺序进程,这是一种消息传递模型,在goroutine间传递消息,而不是对数据进行加锁来实现同步访问。在goroutine之间使用channel来同步和传递数据。

  • 多个协程之间通讯的管道
  • 一端推入数据,一端拿走数据
  • 同一时间,只有一个协程可以访问通道的数据
  • 协调协程的执行顺序

如果多个线程都使用了同一个数据,就会出现竞争问题。因为线程的切换不会听从程序员的意志,时间片用完就切换了。解决办法往往需要加锁,让其他线程不能对共享数据进行修改,从而保证逻辑正确。

但锁的引入严重影响并行效率。

需求:

1、有一个全局数count,初始为0。编写一个函数inc,能够对count增加10万次。执行5次inc函数,请问最终count值是多少?

var count = 0

func inc() {
    for i := 0; i < 100000; i++ {
        count++
    }
}
func main() {
    start := time.Now()
    inc()
    inc()
    inc()
    inc()
    inc()
    fmt.Println("Go协程数:", runtime.NumGoroutine())
    fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
    fmt.Println("执行时长:", time.Since(start).Microseconds())
    fmt.Println("count=", count)

}
// 输出
Go协程数: 1
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
执行时长: 1531
count= 500000

这是串行,一条一条执行,没有并发

2、如果并发执行inc函数,该怎么做呢,请问最终count值是多少?

var count = 0
var sw sync.WaitGroup

func inc() {
    defer sw.Done()
    for i := 0; i < 100000; i++ {
        count++
    }
}
func main() {
    start := time.Now()
    sw.Add(5)
    for i := 0; i < 5; i++ {
        go func() {
            inc()
        }()
    }
    fmt.Println("Go协程数:", runtime.NumGoroutine())
    fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
    sw.Wait()
    fmt.Println("执行时长:", time.Since(start).Microseconds())
    fmt.Println("count=", count)

}
// 输出
Go协程数: 6
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
执行时长: 559
count= 159255

开了5个协程并发,count结果不为50万了。为什么?count随机了吗?

在上面代码中设置runtime.GOMAXPROCS(1),输出会怎么样?

runtime.GOMAXPROCS(1) 表示只使用一个CPU核心来并发执行go程序,设置为1会降低并发性能,大多数情况,go会自动管理并发的核心数,获得最佳的性能,最好不要手动设置该参数。

runtime.GOMAXPROCS(1)

原因在于count++不是原子操作,会被打断。所以,即使使用goroutine也会有竞争,一样会有并发安全问题。换成下句试一试

func inc() {
    defer sw.Done()
    for i := 0; i < 100000; i++ {
        //count++
        atomic.AddInt64(&count, 1)
    }
}
// 输出
Go协程数: 6
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
执行时长: 25306
count= 500000

结果正确了,但是这种共享内存的方式执行时长明显增加。

也可以使用互斥锁来保证count++的原子性操作

var count int64 = 0
var sw sync.WaitGroup
var lock sync.Mutex

func inc() {
    defer sw.Done()
    for i := 0; i < 100000; i++ {
        //count++
        lock.Lock()
        count++
        lock.Unlock()
    }
}

3、能否使用通道,来同步多个协程

var count int64 = 0
var sw sync.WaitGroup
var lock sync.Mutex
var ch = make(chan int, 1)

func inc() {
    defer sw.Done()
    for i := 0; i < 100000; i++ {
        //count++
        t := <-ch
        t++
        ch <- t
    }
}
func main() {
    start := time.Now()
    ch <- 0
    sw.Add(5)
    for i := 0; i < 5; i++ {
        go func() {
            inc()
        }()
    }
    fmt.Println("Go协程数:", runtime.NumGoroutine())
    fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
    sw.Wait()
    fmt.Println("执行时长:", time.Since(start).Microseconds())
    fmt.Println("count=", <-ch)

}

// 输出
Go协程数: 6
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
执行时长: 78612
count= 500000

上例是计算密集型,对同一个数据进行争抢,不是能发挥并行计算优势的例子,也不适合通道,用锁实现更有效率,更有优势。

通道适合数据流动的场景

  • 如同管道一样,一级一级处理,一个协程处理完,发给其他协程
  • 生产者、消费者模型

协程泄露

原因:

  • 协程阻塞,未能如期结束,之后就会有大量累积
  • 协程阻塞最常见的原因都跟通道有关
  • 由于每个协程都要占用内存,所以携程泄露也会导致内存泄露

因此,如果你不知道你创建的协程何时能够结束,就不要使用它。否则可能协程泄露。