目录

Go context

花了两个晚上学习了一下 Go context 包,顺便记录下来。

context 包主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。提供对 goroutine 的并发控制和超时控制。

1
go version
1
go version go1.18 darwin/arm64

源码阅读

Context 接口

Context 定义了 4 个方法,它们都是幂等的。

  • 取消某个 Context 时,从该 Context 派生的所有 Context 也将被取消。
  • WithCancel、WithDeadline 和 WithTimeout 函数传入父 Context 并返回派生出的子 Context 和 CancelFunc。
  • 调用 CancelFunc 会取消子节点和子节点的子节点,删除父节点对子节点的引用,并停止任何关联的计时器。
  • 调用 CancelFunc 失败会泄漏子节点和子节点的子节点,直到父节点被取消或计时器触发。
  • go vet 会工具检查是否在所有控制流路径上使用了 CancelFuncs。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Context interface {
	// Deadline 返回代表此 Context 完成的工作应被取消的时间。
	// 未设置截止时间时,Deadline 返回 ok==false。
	// 连续调用 Deadline 返回相同的结果。
	Deadline() (deadline time.Time, ok bool)
	// 当工作完成,Done 会返回一个关闭的通道,说明此 Context 应该被取消。
	// 如果这个 Context 永远不能被取消,Done 可能会返回 nil。
	// 连续调用 Done 返回相同的结果。
	// 在 cancel() 函数返回之后,通道的关闭可能会异步进行。
	Done() <-chan struct{}
	// 如果 Done 尚未关闭,则 Err 返回 nil。
	// 如果 Done 已关闭,Err 将返回一个 non-nil 的错误来解释原因:
	//   Cancel: Context 被取消
	//   DeadlineExceeded: Context 超出截止时间
	Err() error
	// Value 为 key 返回与此 Context 关联的值,如果没有值与 key 关联,则返回 nil。
	// 使用相同的键连续调用 Value 将返回相同的结果。
	// Context 值仅用于传输进程和 API 边界的请求范围数据,而不用于向函数传递可选参数。
	// key 标识 Context 中的特定值。希望在 Context 中存储值的函数通常会在全局变量中分配一个键,然后将该键用作 context.WithValue 和 Context.Value 的参数。
	// key 可以是任何支持比较的类型;包应将键定义为未导出的类型以避免冲突。
	Value(key any) any
}

emptyCtx

emptyCtx 实现一个空的 Context,永远不会被取消,没有存储值,也没有 deadline。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key any) any {
	return nil
}

func (e *emptyCtx) String() string {
	switch e {
	case background:
		return "context.Background"
	case todo:
		return "context.TODO"
	}
	return "unknown empty Context"
}

context 包将 emptyCtx 包装成两个常用空 Context,并通过函数导出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

func Background() Context {
	return background
}

func TODO() Context {
	return todo
}

background 通常用在 main 函数中,作为所有 Context 的根节点。

todo 通常使用在不知道传什么 Context 的场景,如代码重构,用于占个位置。

cancelCtx

cancelCtx 是 context 包的核心,它提供了一个可取消的上下文。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
var closedchan = make(chan struct{})

func init() {
	close(closedchan) // 创建一个可重用的,关闭状态的通道
}

// &cancelCtxKey 是返回 cancelCtx 自身的特殊 key。
var cancelCtxKey int

type cancelCtx struct {
	Context

	mu       sync.Mutex            // 保护以下字段
	done     atomic.Value          // 保存 chan struct{},懒汉式创建,调用一次 cancel() 时关闭
	children map[canceler]struct{} // 在第一次调用 cancel() 时设置为 nil
	err      error                 // 在第一次调用 cancel() 时设置为 non-nil
}

func (c *cancelCtx) Value(key any) any {
	// 如果 key 等于 &cancelCtxKey,就返回该 cancelCtx 本身
	if key == &cancelCtxKey {
		return c
	}
	// 不相等,向上递归查找有没有匹配该 key 的 value,这里传入 parent
	return value(c.Context, key)
}

func value(c Context, key any) any {
	for {
		switch ctx := c.(type) {
		case *valueCtx:
			if key == ctx.key {
				return ctx.val // 找到了对应的 key
			}
			c = ctx.Context // 向上寻找
		case *cancelCtx:
			if key == &cancelCtxKey {
				return c // 如果 key 为 &cancelCtxKey 就返回该 cancelCtx 自身
			}
			c = ctx.Context
		case *timerCtx:
			if key == &cancelCtxKey {
				return &ctx.cancelCtx // 由于 timerCtx 内部包含一个 cancelCtx,直接返回内部的 cancelCtx
			}
			c = ctx.Context
		case *emptyCtx:
			return nil // emptyCtx 不含 key-val 对
		default:
			return c.Value(key) // 递归向上寻找
		}
	}
}

func (c *cancelCtx) Done() <-chan struct{} {
	// 懒汉式加载
	// 如果 d.done 不为空,直接返回
	// 如果 d.done 为空,make 一个 chan struct{} 存入 c.done 中并返回
	// 即 c.Done() 至少被调用一次,c.done 才不为空
}

func (c *cancelCtx) Err() error {
	// ...
}

func (c *cancelCtx) String() string {
	return contextName(c.Context) + ".WithCancel" // 父节点名称.WithCancel
}

// cancel 关闭 c.done,取消 c 的每个子节点,如果 removeFromParent 为真,从其父亲的子节点中删除 c。
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	// 必须传入 err
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // 已经被其他 goroutine 取消
	}
	c.err = err // 给 err 赋值,注意此时还在 lock 状态
	d, _ := c.done.Load().(chan struct{})
	if d == nil {
		c.done.Store(closedchan) // 没有初始化过 done channel,就直接赋值一个已关闭的 chan
	} else {
		close(d)
	}
	for child := range c.children {
		// 递归取消所有子节点
		// 注意:该节点的子节点并不从该节点移除
		// 注意:在持有父锁的同时获取子锁。
		child.cancel(false, err)
	}
	// 置空子节点
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		// 从父节点中移除自己
		removeChild(c.Context, c)
	}
}

创建可取消的 Context 的方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// CancelFunc 告诉操作放弃它的工作。
// CancelFunc 不会等待工作停止。
// 一个 CancelFunc 可以被多个 goroutine 同时调用。
// 在第一次调用之后,对 CancelFunc 的后续调用什么也不做。
type CancelFunc func()

// WithCancel 返回具有新 Done 通道的 parent 副本。
// 返回的 Context 的 Done 通道在调用返回的 cancel 函数或父 Context 的 Done 通道关闭时关闭,以先发生者为准。
// 取消此 Context 会释放与其关联的资源,因此代码应在此 Context 中运行的操作完成后立即调用取消。
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx 返回一个初始化的 cancelCtx。
func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}


// goroutines 计算曾经创建的 goroutines 的数量;供测试用。
var goroutines int32

// propagateCancel 构建父节点与子节点之间的关联关系,在 parent 被取消时取消 child 及其子节点。
func propagateCancel(parent Context, child canceler) {
	done := parent.Done()
	if done == nil {
		return // parent 永远不会被取消
	}

	// 非阻塞判断 parent 是否被取消
	select {
	case <-done:
		// parent 已被取消,取消 child 及其子节点后返回
		child.cancel(false, parent.Err())
		return
	default:
	}

	// 寻找 parent 的可以取消的父节点
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// parent 已被取消,取消 child 及其子节点
			child.cancel(false, p.err)
		} else {
			// 将 child 存入 parent 的 map 中
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
		atomic.AddInt32(&goroutines, +1)
		// 没有找到可取消的父节点,启动一个 goroutine 监听 parent 的结束信号
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

// parentCancelCtx 返回 parent 的第一个祖先 cancelCtx 节点。
// 它通过查找 parent.Value(&cancelCtxKey) 来找到最里面的封闭 *cancelCtx,然后检查 parent.Done() 是否与 *cancelCtx 匹配。
// (如果没有,*cancelCtx 已经被包装在一个自定义实现中,提供了一个不同的完成通道,在这种情况下我们不应该绕过它。)
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
	done := parent.Done()
	// parent 已经被取消或者无法永远不会被取消,直接返回
	if done == closedchan || done == nil {
		return nil, false
	}
	// 向上递归寻找最近的 cancelCtx。由上面的 cancelCtx.Value() 可知,当传入参数为 &cancelCtxKey 时,返回 c 自身。否则向上寻找。
	p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
	if !ok {
		return nil, false
	}
	pdone, _ := p.done.Load().(chan struct{})
	// 当前 parent 类型不是标准 cancelCtx 时,返回 false
	if pdone != done {
		return nil, false
	}
	return p, true // 返回找到的 cancelCtx
}

parentCancelCtx 由 parent 向上寻找最近的 cancelCtx,这个 cancelCtx 可以是 parent 本身。当这个 cancelCtx 的 Done() 方法被重写,该 cancelCtx 就不是标准的 cancelCtx,context 就无法保证所有通过 done channel 通知的 goroutine 被正确的关闭。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
	"context"
)

type MyContext struct {
	context.Context
}

// Done 重写 cancelCtx 的 Done 方法
func (c MyContext) Done() <-chan struct{} {
	return make(<-chan struct{})
}

func main() {
	ctx := context.Background()            // ctx: emptyCtx
	c1, cancel1 := context.WithCancel(ctx) // c1: cancelCtx{Context: ctx, done: nil, children: nil, err: nil}
	defer cancel1()
	c2 := MyContext{Context: c1}          // c2: MyContext{Context: c1}
	c3, cancel2 := context.WithCancel(c2) // c3: cancelCtx{Context: c2, done: chan struct{}, nil, err: nil}
	defer cancel2()
	// 停在这里
	_ = c3
}

上面这个例子中,c1.done 为 nil,因为调用链 WithCancel(c2) -> propagateCancel(c2, child) -> c2.Done(),而 c2.Done() 被我们重写了,导致 c2.done 为 nil,在创建 c3 时 parentCancelCtx() 中 pdone, _ := p.done.Load().(chan struct{}) 得到的 pdone 为 nil,与 parent.Done()c2.Done() 返回的 <-chan struct{} 不相等。于是上下文在这里断开,进入 propagateCancel() 中的 else 分支,新开一个 goroutine 监听两个 Context 的 Done channel。

WithCancel 和 WithDeadline 返回的 cancel() 会将自身从父节点的子节点中移除

带取消的 Context

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
	"context"
	"fmt"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // main 函数完成后主动关闭创建的 goroutine

	for n := range gen(ctx) {
		fmt.Println(n)
		if n == 5 {
			break
		}
	}
}

func gen(ctx context.Context) <-chan int {
	dst := make(chan int)
	n := 1
	go func() {
		for {
			select {
			case <-ctx.Done():
				return // 返回防止 goroutine 泄露
			case dst <- n:
				n++
			}
		}
	}()
	return dst
}

timerCtx

1
2
3
4
5
6
7
// timerCtx 带有一个计时器和一个截止时间。它嵌入了一个 cancelCtx 来实现 Done 和 Err。它通过停止计时器然后委托给 cancelCtx.cancel 来实现取消。
type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

带 deadline 的 Context

创建时调用 time.AfterFunc() 方法在计时器结束时调用 cancel() 方法将自己从父 Context 节点中移除并通知子节点结束任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
	"context"
	"fmt"
	"time"
)

const shortDuration = 10 * time.Millisecond

func main() {
	d := time.Now().Add(shortDuration)
	ctx, cancel := context.WithDeadline(context.Background(), d)
	defer cancel()

	select {
	case <-time.After(1 * time.Second):
		fmt.Println("overslept")
	case <-ctx.Done():
		fmt.Println(ctx.Err())
	}
}

带 timeout 的 Context

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
	"context"
	"fmt"
	"time"
)

const shortDuration = 1 * time.Millisecond

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), shortDuration)
	defer cancel()

	select {
	case <-time.After(1 * time.Second):
		fmt.Println("overslept")
	case <-ctx.Done():
		fmt.Println(ctx.Err())
	}
}

valueCtx

1
2
3
4
5
// valueCtx 带有一个键值对。它为传入的 key 实现 Value 方法并将所有其他调用委托给内部的 Context。
type valueCtx struct {
	Context
	key, val any
}

带键值对的 Context

context.WithValue() 创建一个带键值对的 Context,内部 Context 指向父节点。形成一个链表。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
	"context"
	"fmt"
)

func main() {
	k := "language"
	ctx := context.WithValue(context.Background(), k, "Go")

	find(ctx, k)
	find(ctx, "color")
}

func find(ctx context.Context, k string) {
	if v := ctx.Value(k); v != nil {
		fmt.Println("found value:", v)
		return
	}
	fmt.Println("key not found:", k)
}

Cheat Sheet

请求上下文

在 Request Context 中保存整个请求都要用到的数据如 RequestID, UserID 等

TODO

总结

context 包加上注释不过 600 行,短小精悍,却让人眼前一亮。简单的解决了 goroutine 的控制问题,提供了主动取消的手段。

参考资料

context package
深度解密 Go 语言之 context