一个计算机技术爱好者与学习者

0%

好好学Golang:协程Goroutine

1. Golang协程简介

在Go语言中,协程(coroutine)是通过goroutine实现的。
goroutine是Go运行时管理的轻量级线程,由Go运行时环境调度,不是操作系统层面的线程。goroutine使得并发编程变得简洁易懂。

参考文档:

2. 创建goroutine

当一个程序启动时,它仅仅是一个单独的goroutine(即主goroutine)。

创建一个新的goroutine非常简单,只需要在函数调用前加上关键字go。一旦goroutine被创建,它将与创建它的goroutine(简单理解为线程)并发(或并行)执行。

下面是一个goroutine的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"time"
)

// printNumbers 打印数字,它会在一个单独的goroutine中执行
func printNumbers() {
for i := 0; i < 5; i++ {
fmt.Printf("%d ", i)
time.Sleep(50 * time.Millisecond) // 等待一小段时间
}
}

func main() {
go printNumbers() // 开启一个新的goroutine

// main函数也做一些打印工作
for i := 0; i < 5; i++ {
fmt.Printf("%c ", 'a'+i)
time.Sleep(100 * time.Millisecond) // 等待一段比printNumbers更长的时间
}
}

在这个例子中,main函数中的go printNumbers()语句启动了一个新的goroutine,而main函数自身也在执行打印操作。这两组打印操作会异步执行。这意味着main函数不会等待printNumbers函数完成,而是会立即开始它的循环和打印操作。在这个简单的程序中,我们可能会看到这两个循环的输出(数字和字母)交错在一起。

值得注意的是,main函数(主协程)必须运行足够长的时间,以允许别的goroutine有机会开始执行和完成它们的任务。如果main函数过早地结束,程序则会退出,并且所有在运行中的goroutine也会同时终止。在实际应用程序中,这通常通过等待组(sync.WaitGroup)、通道(channel)或其他同步机制来确保goroutine可以完成它们的任务。

3. 通道channel

3.1. 基础概念

在Go语言中,channel是一种内置的数据结构,可以让多个goroutines之间安全地进行数据通信。channel被用来在goroutines间进行通信和同步,它的设计理念来源于CSP(Communicating Sequential Processes)- 一种消息传递模型。

channel操作:

  • 创建:使用make()函数创建channel,可以指定channel可存储的元素类型。例如,make(chan int)创建了一个可以传输整数类型的channel。
  • 发送操作:使用channel <- value语法向channel发送值。
  • 接收操作:使用<-channel语法从channel接收值。
  • 关闭:使用内置的close()函数来关闭channel。关闭后不可再向其中发送数据,但仍可接收缓冲中剩余的数据。

channel类型:

  • 无缓冲Channel(非缓冲Channel): 在发送者和接受者准备好之前,不储存任何值。发送操作会阻塞,直到另一个goroutine在对应的channel上执行接收操作。
  • 有缓冲Channel:可以存储一个或多个值,仅当缓冲满时,向channel发送数据才会阻塞;当缓冲为空时,接收操作才会阻塞。

3.2. 无缓冲channel示例

在Go中,无缓冲的channel(或称作同步channel)是在没有空间保持任何元素的情况下进行通信的。一个goroutine在无缓冲channel上发送操作会阻塞,直到另一个goroutine执行接收操作,这样发送者和接收者必须同时准备好进行通信。

以下是一个使用无缓冲channel的示例。在这个例子中,有一个主goroutine和一个worker goroutine。它们将使用无缓冲channel来同步工作:主goroutine发送工作到channel,worker接收工作,执行,然后通知主goroutine工作已完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"time"
)

func worker(done chan bool) {
fmt.Print("Working...")
time.Sleep(time.Second)
fmt.Println("done")

// 发送一个值来通知工作已经完成
done <- true
}

func main() {
// 创建一个无缓冲的channel
done := make(chan bool, 0)

// 启动worker goroutine并传递‘done’channel
go worker(done)

// 阻塞直到在channel上收到worker的完成通知
<-done
}

在这个例子中,worker函数接受一个类型为bool的无缓冲channel done,它用于向主goroutine发送完成信号。在执行了一些工作(本例中模拟为一秒钟的Sleep调用)后,workertrue发送到done channel。对于done <- true这个语句,在发送完成信号之前,这个worker goroutine 会阻塞,直到主goroutine执行<-done接收操作。

同样,主程序中的<-done将阻塞直到从done channel接收到值。这样,主程序就会等待worker goroutine完成它的工作并发送完成信号。一旦收到信号,主程序继续执行,程序结束。

这个过程演示了无缓冲channel在goroutine间同步执行的用法,确保main函数在工作完成之前不会退出。

3.3. 有缓冲channel示例

下面是一个有缓冲channel的使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
)

func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "processing job", j)
results <- j * 2 // 假设的工作是将job值翻倍
}
}

func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)

// 创建3个worker协程。
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}

// 发送5个jobs然后关闭jobs channel。
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)

// 收集所有的处理结果。
for a := 1; a <= 5; a++ {
<-results
}
}

在这个例子中,我们创建了两个channel:jobs用于发送工作项到各个worker协程,results用于从worker协程收集处理结果。我们启动了3个worker协程,在它们中间分配工作项,并从jobs channel读取。每个worker处理一个工作项,然后将结果发送到results channel。

main函数中,我们向jobs发送了5个工作项,并关闭了它,表示不会再发送新工作了。然后我们从results channel接收每个worker的计算结果。需要注意的是,我们不需要显式关闭results channel,因为当main函数结束时,程序会自动结束,所有channel将随之关闭。

在缓冲channel中,只有在尝试发送更多数据而channel已满时,发送操作才会阻塞;同样,只有channel为空时,接收操作才会阻塞。
因为我们的channel缓冲区可存100个信号,而程序中只使用了5个信号,因此不会被阻塞。

4. 上下文Context

参考文档:好好学Golang:上下文Context

5. 同步原语

5.1. 基础概念

在Go语言中,同步原语(Synchronization primitives)是一组用于协调多个协程(goroutines)之间访问共享资源或执行顺序的工具。
Go语言的标准库sync为程序员提供了该领域的多个工具,其中包括互斥锁(Mutexes)、读写锁(RWMutexes)、WaitGroup、Cond等。

5.2. 同步原语工具概述

5.2.1. Mutex(互斥锁)

Mutex用于保护共享资源,防止同时进行的多个协程同时读写导致竞态条件(Race Condition)。

  • Lock():锁定Mutex,任何其他试图锁定该Mutex的协程都将阻塞直到Mutex被解锁。
  • Unlock():解锁Mutex,如果有其他协程在等待锁定这个Mutex,它们中的一个将能够锁定它并继续执行。

5.2.2. RWMutex(读写锁)

RWMutex是一种特殊类型的Mutex,它允许多个协程同时读取资源,但只能有一个协程写入资源。

  • RLock():锁定读锁,其他协程可以同时锁定读锁,但写锁在此状态下是无法被锁定的。
  • RUnlock():解锁读锁。
  • Lock():锁定写锁,需要等待直到所有的读锁被解锁。
  • Unlock():解锁写锁。

5.2.3. WaitGroup

WaitGroup用于等待协程集合完成执行。我们可以用它来确保程序在继续执行之前,一组相关的协程已经运行完毕。

  • Add(delta int):增加WaitGroup的计数器。
  • Done():减少WaitGroup的计数器,相当于Add(-1)
  • Wait():阻塞,直到计数器归零。

5.2.4. Cond(条件变量)

Cond实现了条件变量的功能,它可以让一组协程等待某个条件为真。Cond常与Mutex或RWMutex一同使用。

  • NewCond(*sync.Locker):创建Cond的新实例。
  • Wait():等待条件变量满足,它在内部调用Unlock()并挂起当前协程,直到Signal()Broadcast()被调用,然后重新锁定。
  • Signal():唤醒一个等待该条件的协程。
  • Broadcast():唤醒所有等待该条件的协程。

5.3. 同步原语工具示例

5.3.1. WaitGroup示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"sync"
"time"
)

// worker 模拟一个执行任务的function
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 在函数退出之前,调用wg的Done()通知已完成
fmt.Printf("Worker %d starting\n", id)
time.Sleep(2 * time.Second) // 模拟耗时任务
fmt.Printf("Worker %d done\n", id)
}

func main() {
var wg sync.WaitGroup

// 启动多个worker协程
for i := 1; i <= 3; i++ {
wg.Add(1) // 添加一个计数
go worker(i, &wg)
}
wg.Wait() // 阻塞,直到wg计数器回到0
fmt.Println("All workers completed")
}

WaitGroup用于等待一组协程完成各自的工作。上面的代码中,每个协程在开始时调用wg.Add(1),在工作完成时调用wg.Done()main函数调用wg.Wait()等待所有协程完成。

5.3.2. Mutex示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"fmt"
"sync"
)

var balance int
var mutex sync.Mutex

func deposit(value int, wg *sync.WaitGroup) {
mutex.Lock() // 获取互斥锁
balance += value
mutex.Unlock() // 释放互斥锁
wg.Done()
}

func withdraw(value int, wg *sync.WaitGroup) {
mutex.Lock() // 获取互斥锁
balance -= value
mutex.Unlock() // 释放互斥锁
wg.Done()
}

func main() {
var wg sync.WaitGroup
balance = 100 // 初始化账户余额

// 执行多个存款和取款操作
wg.Add(2)
go deposit(50, &wg)
go withdraw(30, &wg)

// 等待所有操作完成
wg.Wait()

fmt.Println("New Balance:", balance)
}

在这个例子中,balance代表一个银行账户的余额,多个协程尝试进行存款或取款操作。由于所有的存取款操作都通过互斥锁mutex来同步,因此无论协程的执行顺序如何,最终的账户余额都将是一致且正确的。使用sync.WaitGroup保证main函数会等待两个协程操作完成后才打印最终余额。

5.3.3. RWMutex示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"fmt"
"sync"
"time"
)

var (
counter int
rwMutex sync.RWMutex
)

// reader 模拟一个读取操作
func reader(id int, wg *sync.WaitGroup) {
defer wg.Done()
rwMutex.RLock() // 获取读锁
fmt.Printf("Reader %d: %d\n", id, counter)
time.Sleep(1 * time.Millisecond) // 读取过程耗时
rwMutex.RUnlock()                // 释放读锁
}

// writer 模拟一个写入操作
func writer(id int, wg *sync.WaitGroup) {
defer wg.Done()
rwMutex.Lock() // 获取写锁
counter++
fmt.Printf("Writer %d: %d\n", id, counter)
time.Sleep(1 * time.Millisecond) // 写入过程耗时
rwMutex.Unlock()                 // 释放写锁
}

func main() {
var wg sync.WaitGroup

// 启动读者
for i := 0; i < 5; i++ {
wg.Add(1)
go reader(i, &wg)
}

// 启动写者
for i := 0; i < 3; i++ {
wg.Add(1)
go writer(i, &wg)
}

wg.Wait() // 等待所有读者和写者结束
}

在上面的代码中,我们使用sync.RWMutex维护对counter读写操作的同步,允许多个读操作并行执行,但对写操作互斥访问。

5.3.4. Cond示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"fmt"
"sync"
"time"
)

func main() {
var mu sync.Mutex             // 锁,用于保护条件
cond := sync.NewCond(&mu)     // 创建条件变量cond
var ready bool                // 条件:表示工作是否就绪

// 广播启动工作的协程
go func() {
time.Sleep(2 * time.Second) // 模拟一些准备工作耗时
mu.Lock()                 // 获取锁,准备改变条件
ready = true                // 设置工作就绪
cond.Broadcast()            // 广播通知给所有等待的协程
mu.Unlock()                 // 释放锁
}()

// 创建多个工作者协程等待条件变量
const nWorkers = 3
var wg sync.WaitGroup
wg.Add(nWorkers)
for i := 0; i < nWorkers; i++ {
go func(id int) {
defer wg.Done()
mu.Lock()                 // 获取锁以检查条件
for !ready {             // 循环,直到工作就绪为止
cond.Wait()         // 等待条件变量的信号
}
fmt.Printf("Worker %d started\n", id)
mu.Unlock()                // 释放锁,让其他协程进入临界区
}(i)
}
wg.Wait()
}

在这段代码中,我们创建了一个条件变量cond。所有的协程启动时都会通过使用cond.Wait()等待(阻塞),直到条件准备就绪。一旦条件满足,我们用cond.Broadcast()通知所有等待中的协程。
注意,在调用WaitBroadcast之前和之后,我们都需要手动获取和释放相关的锁。

  • 本文作者: 好好学习的郝
  • 原文链接: https://www.voidking.com/dev-golang-goroutine/
  • 版权声明: 本文采用 BY-NC-SA 许可协议,转载请注明出处!源站会即时更新知识点并修正错误,欢迎访问~
  • 微信公众号同步更新,欢迎关注~