目录

Go | Go 的并发编程

首先来看一下对 Go 语言并发机制的赞美:

goroutine 和 channel 支撑起了 Go 语言的并发模型的基石,让 Go 语言在如今集群化与多核化的时代成为一道极为亮丽的风景线。

优雅的并发编程范式,完善的并发支持,出色的并发性能是 Go 语言区别于其他语言的一大特色。

Go 语言以并发编程作为语言的最核心优势。

Go 语言社区的著名口号:不要通过共享内存来通信,要通过通信来共享内存。

执行体

执行体是一个抽象的概念,在操作系统层面有多个概念与之对应,比如操作系统自己掌管的进程(process)、进程内的线程(thread)以及进程内的协程(coroutine,也叫轻量级线程)。那么,这些执行体也是并发的几种主流的实现模型:

  1. 多进程。多进程是在操作系统层面进行并发的基本模式。同时也是开销最大的模式,因为所有的进程都是由内核管理的,但是好处在于简单、进程间互不影响。在Linux平台上,很多工具链正是采用这种模式在工作的。比如某个Web服务器,会有专门的进程负责网络端口的监听和链接管理。
  2. 多线程。多线程在大部分操作系统上都属于系统层面的并发模式,也是我们使用最多的最有效的一种模式。目前,我们几乎所有工具链都会使用这种模式。它比多进程开销小很多,但是开销依旧很大,且在高并发模式下,效率会有影响。
  3. 协程。协程(Coroutine)本质上是一种用户态线程,不需要操作系统来进行抢占式调度,且在真正的实现中寄存于线程中,因此,系统开销极小,可以有效提高线程的任务并发性,而避免多线程的缺点。使用协程的优点是编程简单,结构清晰;缺点是需要语言的支持,如果不支持,则需要用户在程序中自行实现调度器。目前,原生支持协程的语言还很少。

goroutine

上述可以看到与传统的系统级线程和进程相比,协程的最大优势在于其“轻量级”。我们可以轻松创建上百万个协程而不会导致系统资源枯竭,然后创建线程和进程通常最多也不能超过1万个。但是多数语言在语法层面上并不直接支持协程,而是通过库的方式支持,但是通过库的方式支持的功能也并不完整。

那么**Go在语言级别支持轻量级线程(协程),叫做goroutine,由Go运行时(runtime)管理。**Go 语言标准库提供的所有 system call 操作,包括所有同步 IO 操作,都会出让 CPU 给其他 goroutine。

goroutine的基本使用

假如想要 Add() 这个函数并发执行,那么使用 go Add() 就可以让这个函数并发执行,也就是让这个函数在一个新的 goroutine 中并发执行,当函数返回时,这个 goroutine 也就结束了。如果这个函数有返回值那么这个返回值会被抛弃。

出让时间片

可以使用 runtime 包中的 Gosched() 函数让 goroutine 主动出让时间片给其他 goroutine。

多核并行化

多核并行化的意思就是说,将 goroutine 分布到多个 CPU 核心中运行,从而实现并行化。在 Go 语言升级到默认支持多 CPU 的某个版本之前,可以通过设置环境变量 GOMAXPROCS 的值来控制使用多个 CPU 核心。或者可以在启动 goroutine 之前调用如下方法设置使用 4 个 CPU 核心:

1
runtime.GOMAXPROCS(4)

在精准设置之前,可以先通过 runtime 包中的 NumCPU() 函数获取 CPU 核心数,之后再进行上述设置。

并发通信

在工程上,**有两种最常见的并发通信模型:共享数据和消息。**共享数据是指多个并发单元分别保持对同一个数据的引用,实现对该数据的共享。共享的数据可能有多种形式,比如内存数据块、磁盘文件、网络数据。在实际工程中最常见的无疑是内存了,也就是常说的共享内存。针对共享内存方式来说,传统的C代码如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int counter;

void *count() {
    pthread_mutex_lock(&mutex);
    counter ++;
    printf("Counter value: %d\n", counter);
    pthread_mutex_unlock(&mutex);
}

void main() {
    int rc1, rc2;
    pthread_t thread1, thread2;

    if(rc1 = pthread_create(&thread1, NULL, &count, NULL)) {
        printf("Thread creat failed: %d\n", rc1);
    }
    if(rc2 = pthread_create(&thread2, NULL, &count, NULL)) {
        printf("Thread creat failed: %d\n", rc2);
    }

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    exit(0);
}

在 Go 中也可以使用类似的方法去实现,但是这种方式对于以并发编程语言作为语言的最核心优势的 Go 语言来说太过于臃肿了,显然不能使用这样的方式。那么下面来看一下 Go 语言中的通信方式。

channel

Go 语言中是以消息机制而非共享内存的作为通信方式。而Go语言提供的消息通信机制被称为 channel,可以使用 channel 在两个或者多个 goroutine 之间传递消息。因为 channel 是进程内部的通信方式,因此通过 channel 传递对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针等。

消息机制认为每个并发单元是自包含的、独立的个体,并且都有自己的变量,并发单元之间这些变量不共享。每个并发单元的输入和输出只有一种,那就是消息。

基本操作

  • channel 声明

    **channel 是类型相关的,也就是一个channel 只能传递一种类型的值,这个类型需要在声明 channel 时指定。**一般使用如下方式进行声明:

    1
    
    var chanName chan ElementType
    

    举例如下

    1
    2
    
    var ch chan int		// 声明一个传递类型为 int 的 channel
    var m map[string] chan bool // 声明一个map,key 是 string, value 是 bool 型的 channel
    
  • channel 创建

    使用 make() 函数

    1
    2
    
    ch := make(chan int)		// 声明并初始化一个 int 型 channel
    ch := make(chan int, 1024)	// 初始化一个大小为 1024 的 int 型 channel(缓冲机制)
    
  • 读/写

    往 channel 里面写入数据。假如 channel 中已有的数据已达到了 channel 的缓冲容量,那么写入数据会导致程序阻塞。

    1
    
    ch <- value
    

    从 channel 中读取数据。假如 channel 中没有数据,那么从 channel 中读取数据会导致程序阻塞。

    1
    2
    3
    4
    
    value := <-ch	// 普通做法,也可以是带缓冲的 channel 的做法
    for i := range ch {	// 带缓冲的 channel,也可以这样
        fmt.Println("Received:", i)
    }
    
  • 关闭channel

    使用内置函数close()即可,如

    1
    
    close(ch)
    

    另外 <-ch 其实会返回两个值,第二个值是 channel 是否关闭的标识符,可以来判断一个 channel 是否被关闭。

下面我们来段示例代码演示下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import "fmt"

func Count(ch chan int) {
	fmt.Println("Hello, I am goroutine")
	ch <- 1
}

func main() {
	// 定义一个切片,这里面的每一个元素都是 chan int 型的, 但是未初始化, 都是 nil
	chs := make([]chan int, 10)		

	for i := 0; i < 10; i++ {
		chs[i] = make(chan int)	// 创建 chan int
		go Count(chs[i])
	}

	for _, ch := range(chs) {
		<- ch
	}
}

上述这段代码中,我们循环创建了 10 个 int 型的 channel,并依次把他们分配给 goroutine,而 goroutine 中的函数主要是往 channel 中写入一个数据。在所有的 goroutine 都启动完成之后,循环从 10个 channel 中读取数据,假如有的 goroutine 还没有运行到,那么 channel 中是没有数据,那么从 channel 读取数据将会阻塞掉。假如所有的 goroutine 都执行完毕,那么最后一段循环才会结束,也就保证了在所有 goroutine 执行完成后,主函数才会返回。

select

Go 语言在语言级别支持 select 关键字,用于处理异步 IO 问题。select 的用法与 switch 用法非常类似,如下所示

1
2
3
4
5
6
7
8
select {
    case <- chan1:
    	// 如果 chan1 成功读到数据,则处理该 case 情况
	case chan2 <- 1:
    	// 如果成功向 chan2 写入数据,则处理该 case 情况
    default
    	// 如果上面都没有成功,则进入 default 处理流程
}

但是 select 有比较多的限制和不同,比如

  • select 语句块中的 case 语句必须是一个 IO 操作,也就是说必须是一个面向 channel 的操作
  • select 后面并不用像 switch 那样带判断条件,而是直接去查看 case 语句

Unix 时代,select 机制就已经被引入。通过调用 select() 函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了 IO 操作,select() 调用就会返回。

超时机制

在并发编程的通信过程中,最需要处理的就是超时问题。因为在向 channel 写数据时发现 channel 已满,或者从 channel 中试图读取数据时发现 channel 为空,这些都会阻塞,正常情况下,阻塞是暂时的。但是假如没有另外一个 goroutine 向 channel 中写入内容,那么负责读取的 goroutine 将会永远阻塞下去。所以需要对其进行超时处理,也就是说设置一个时间,如果超过设定的时间,还没有完成读取/写入,那么则立即终止。

Go 语言中没有提供直接的超时处理机制,但是可以利用 select 机制,因为 select 机制中,只要其中一个 case 已经完成,那么程序就会继续往下执行,而不会考虑其他 case 的情况。使用示例如下所示

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
timeout := make(chan bool)

go func(){
    time.Sleep(1e9)	// 等待1秒
    timeout <- true	
}()

select {
    case <- ch:
    // 从 ch 中读取数据
    case <-timeout :
    // 从 timeout 中读取数据
}

上述这种使用就相当实现了超时处理机制,假如在 1 秒内从 ch 这个 channel 中读取到了内容,那么由于 timeout 这个 channel 是读取不到内容的,那么则执行 case <-ch。反之,如果超过了 1 秒的时间,从 ch 这个 channel 中还是没有没有读取到数据,但是此时 timeout 这个 channel 是可以读取到了相应的内容,那么则处理 case <-timeout 这种情况,那么这样子就不会因为 ch 没有读取到数据而一直阻塞着。

channel的传递

Go 语言中的 channel 是一个原生类型,与 map 之类的类型地位一样,因为 channel 在定义之后也可以通过 channel 传递。

使用的例子如下所示

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type PipeData struct {
	value int 
	handler func(int) int
	next chan int
}

func handle(queue chan *PipeData) {
	for data := range queue {
		data.next <- data.handler(data.value)
	}
}

单向channel

**单向 channel 是指只能用于发送或者接收数据,但是这边的单向是指对 channel 的一种使用限制。**因为 channel 本身必然是同时支持读写的,如果一个 channel 真的只能读,那么 channel 肯定会空,从而失去意义。同理只能写的 channel 也是类似的。

  • 单向 channel 的声明

    1
    2
    3
    
    var ch1 chan int	// ch1 是一个正常的 channel,不是单向的
    var ch2 chan<- int	// ch2 是一个单向的 channel,只用于写 int 数据
    var ch2 <-chan int 		// ch3 是一个单向的 channel,只用于读取 int 数据 
    
  • 单向 channel 的初始化

    channel 是一个原生类型,支持被传递和类型转换,那么单向 channel 的初始化其实都是由类型转换而来的

    1
    2
    3
    
    ch4 := make(chan int)
    ch5 := <- chan int(ch4)	// ch5 就是一个单向的读取 channel
    ch6 := chan<- int(ch4)	// ch6 就是一个单向的写入 channel
    
  • 单向 channel 的使用

    所有的代码应该都遵循“最小权限原则”,从而避免没必要地使用泛滥问题,进而导致程序失控。单向 channel 就是这样的一种锲约精神。比如可以在将 channel 传递给一个函数的时候,将其指定为单向 channel,从而限制这个函数只能对其进行单向操作。

    示例代码如下所示,这个函数中对 ch 的操作只能是读。假如 ch 已经是单向操作的 channel 了,在函数体中无法再对其进行类型转换。

    1
    2
    3
    4
    5
    
    func parse(ch <-chan int) {
      for value := range ch {
          fmt.Println("Parsing value", value)
      }
    }
    

同步

虽然 channel 通信很香,但是有时候不得不通过共享数据来通信,为此 Go 也提供了妥善的资源锁方案。

同步锁

Go 语言的 sync 包提供了两种锁类型:sync.Mutexsync.RWMutex

  • sync.Mutex

    这种方式是最简单的,当一个 goroutine 获得了 Mutex 之后,其他 goroutine 只能等待这个 goroutine 释放该 Mutex。相应的锁操作方法如下

    函数 操作
    Lock() 获取锁
    Unlock() 释放锁
  • sync.RWMutex

    RWMutex 是经典的单写多读模型(读写者问题)。读锁占用之后,可读不可写。写锁占用之后,不可读也不可写。相应的锁操作方法如下

    函数 操作
    RLock() 获取读锁
    Lock() 获取写锁
    RUnlock() 释放读锁
    Unlock() 释放写锁

使用例子如下,记得获取锁之后一定要释放锁,否则会死锁

1
2
3
4
5
6
var lock sync.Mutex
func foo() {
    lock.Lock()	// 获取锁
     ......
    defer lock.Unlock()	// 释放锁
}

全局唯一性操作

**全局唯一性操作是指,对于从全局的角度只需要运行一次的代码,比如全局初始化操作,那么在 Go 中使用 Once 类型来确保全局唯一性操作。**使用如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
var a string
var once sync.Once

func setup() {
	a = "hello world"
}

func doPrint() {
	once.Do(setup)
	fmt.Println(a)
}

func main() {
	go doPrint()
	go doPrint()		
}

那么上述代码中的 once.Do() 方法保证在全局范围内只调用指定的函数一次。而且当一个 goroutine 在调用此语句的时候,其他 goroutine 在调用此语句时都会被阻塞,直至全局唯一的 Once.Do() 调用结束。

在一般语言中,确保全局唯一性操作可能使用一个全局的 bool 类型变量,在要执行的操作之前先判断该全局变量是否为 false,操作完成之后将其设置 true。如下所示

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
var done bool = false
var a string

func setup() {
    a = "hello, world"
}

func doPrint() {
    if !done {
        setup()
        done = true
    }
    print(a)
}

但是上述代码是存在一定的问题的,因为 setup() 函数不是原子性操作,这种方式还是可能会导致 setup() 被执行多次。

Go在并行中的原子操作

sync 包中包含了 atomic 子包,它提供了对于一些基础数据类型的原子操作函数。