cond(ition)表示为条件. Cond是一个条件锁,就是当满足某些条件下才起作用的锁,有的地方也叫定期唤醒锁,有的地方叫条件变量conditional variable。条件变量是基于互斥锁的,它必须有互斥锁的支撑才能发挥作用.

条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些协程的。当共享资源的状态发生变化时,它可以通知被互斥锁阻塞的协程.
条件变量在这里的最大优势就是在效率方面的提升。当共享资源的状态不满足条件的时候,想操作它的线程再也不用循环往复地做检查, 可以通过一定的手段在满足条件的时候被唤醒.

比如: 去KTV唱歌,里面只有一个话筒, 在你想用的时候别人在用. 此时你有两种方式可以选择:

  • 在旁边一直守着, 他放下话筒你马上抢过来. (浪费性能精力)
  • 坐在旁边等它唱完,然后递给你. (比较省资源)

基本使用

//创建Cond
cond := sync.NewCond(new(sync.Mutex))
//等待唤醒
cond.L.Lock()
cond.Wait()
//唤醒一个
cond.Signal()
//唤醒所有
cond.Broadcast()

数据结构

type Cond struct {
    noCopy noCopy //noCopy:noCopy对象,拥有一个Lock方法,使得Cond对象在进行go vet扫描的时候,能够被检测到是否被复制。
    L Locker
    notify  notifyList
    checker copyChecker
}

type noCopy struct{}  

func (*noCopy) Lock() {}

//notify:notifyList对象,维护等待唤醒的goroutine队列,使用链表实现。
type notifyList struct {
     wait   uint32
     notify uint32
     lock   uintptr
     head   unsafe.Pointer
     tail   unsafe.Pointer
}    

//checker:copyChecker对象,实际上是uintptr对象,保存自身对象地址。
type copyChecker uintptr

// 用于检查是否被拷贝过
func (c *copyChecker) check() {
     if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
            !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
            uintptr(*c) != uintptr(unsafe.Pointer(c)) {
            panic("sync.Cond is copied")
     }
}

cond不能被复制:Cond在内部持有一个等待队列,这个队列维护所有等待在这个Cond的goroutine。因此若这个Cond允许值传递,则这个队列在值传递的过程中会进行复制,导致在唤醒goroutine的时候出现错误。

  • 检查当前checker的地址是否等于保存在checker中的地址
  • 对checker进行原子CAS操作,将checker当前地址赋值给为空的checker
  • 重复操作1,防止在进行1和2的时候,有其他gorountine并发的修改了checker值
  • 若1、2、3都不满足,则表示当前cond是复制的,抛出panic
    check方法在第一次调用的时候,会将checker对象地址赋值给checker,也就是将自身内存地址赋值给自身。
    再次调用checker方法的时候,会将当前checker地址的值与保存的checker地址值进行比较,若不相同则表示当前checker的地址不是第一次调用check方法时候的地址,即cond对象被复制了,checker被重新分配了内存地址。

代码演示

package test

import (
	"fmt"
	"math/rand"
	"sync"

	"testing"
	"time"
)


func TestSyncCond(t *testing.T) {
	rand.Seed(time.Now().UnixNano())
	cond := sync.NewCond(new(sync.Mutex))
	for i := 0 ; i < 5; i ++ {
		go func() {
			cond.L.Lock()
			for !trueOrFalse() {
				fmt.Println("--假: 继续等待")
				cond.Wait()	// 这里会阻塞协程
			}
			fmt.Println("跳出循环条件了")
			cond.L.Unlock()
		}()
	}

	for {
		time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
		cond.Signal()
	}


}

func trueOrFalse() bool {
	randNum := rand.Intn(40)
	fmt.Print(randNum)
	if randNum == 1 {
		return true
	}
	return false
}

输出结果:

=== RUN   TestSyncCond
27--假: 继续等待
31--假: 继续等待
34--假: 继续等待
15--假: 继续等待
16--假: 继续等待
27--假: 继续等待
20--假: 继续等待
34--假: 继续等待
33--假: 继续等待
38--假: 继续等待
30--假: 继续等待
24--假: 继续等待
36--假: 继续等待
33--假: 继续等待
12--假: 继续等待
8--假: 继续等待
0--假: 继续等待
38--假: 继续等待
2--假: 继续等待
1跳出循环条件了
--- PASS: TestSyncCond (23.05s)

在for里执行cond.wait方法,可以防止for的暴力消耗资源, 由于notify是由连表实现的, 所以在执行cond.Signal将会唤醒头部协程. 而 cond.broadcast 会遍历整个链表发送唤醒信号.

整体源码注释

// Cond实现了一个条件变量,一个等待或宣布事件发生的goroutines的集合点。
//
// 每个Cond都有一个相关的Locker L(通常是* Mutex或* RWMutex)。
type Cond struct {
    // 不允许复制,一个结构体,有一个Lock()方法,嵌入别的结构体中,表示不允许复制
    // noCopy对象,拥有一个Lock方法,使得Cond对象在进行go vet扫描的时候,能够被检测到是否被复制
    noCopy noCopy
 
    // 锁的具体实现,通常为 mutex 或者rwmutex
    L Locker
 
    // 通知列表,调用Wait()方法的goroutine会被放入list中,每次唤醒,从这里取出
    // notifyList对象,维护等待唤醒的goroutine队列,使用链表实现
    // 在 sync 包中被实现, src/sync/runtime.go
    notify  notifyList
 
    // 复制检查,检查cond实例是否被复制
    // copyChecker对象,实际上是uintptr对象,保存自身对象地址
    checker copyChecker
 
}
 
// NewCond方法传入一个实现了Locker接口的对象,返回一个新的Cond对象指针,
// 保证在多goroutine使用cond的时候,持有的是同一个实例
func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}
 
 
// 等待原子解锁c.L并暂停执行调用goroutine。
// 稍后恢复执行后,Wait会在返回之前锁定c.L.
// 与其他系统不同,除非被广播或信号唤醒,否则等待无法返回。
 
// 因为等待第一次恢复时c.L没有被锁定,
// 所以当Wait返回时,调用者通常不能认为条件为真。
// 相反,调用者应该循环等待:
 
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
 
// 调用此方法会将此routine加入通知列表,并等待获取通知,调用此方法必须先Lock,不然方法里会调用Unlock(),报错
//
func (c *Cond) Wait() {
 
    // 检查是否被复制; 如果是就panic
    // check检查,保证cond在第一次使用后没有被复制
    c.checker.check()
    // 将当前goroutine加入等待队列, 该方法在 runtime 包的 notifyListAdd 函数中实现
    // src/runtime/sema.go
    t := runtime_notifyListAdd(&c.notify)
    // 释放锁,
    // 因此在调用Wait方法前,必须保证获取到了cond的锁,否则会报错
    c.L.Unlock()
 
    // 等待队列中的所有的goroutine执行等待唤醒操作
    // 将当前goroutine挂起,等待唤醒信号
    // 该方法在 runtime 包的 notifyListWait 函数中实现
    // src/runtime/sema.go
    runtime_notifyListWait(&c.notify, t)
    // 被通知了,获取锁,继续运行
    c.L.Lock()
}
 
 
 
 
//
// 唤醒单个 等待的 goroutine
func (c *Cond) Signal() {
    // 检查c是否是被复制的,如果是就panic
    // 保证cond在第一次使用后没有被复制
    c.checker.check()
    // 通知等待列表中的一个
    // 顺序唤醒一个等待的gorountine
    // 在runtime 包的 notifyListNotifyOne 函数中被实现
    // src/runtime/sema.go
    runtime_notifyListNotifyOne(&c.notify)
}
 
// 唤醒等待队列中的所有goroutine。
func (c *Cond) Broadcast() {
    // 检查c是否是被复制的,如果是就panic
    // 保证cond在第一次使用后没有被复制
    c.checker.check()
    // 唤醒等待队列中所有的goroutine
    // 有runtime 包的 notifyListNotifyAll 函数实现
    // src\runtime\sema.go
    runtime_notifyListNotifyAll(&c.notify)
}
 
// copyChecker保持指向自身的指针以检测对象复制。
type copyChecker uintptr
 
// 检查c是否被复制,如果是则panic
/**
check方法在第一次调用的时候,会将checker对象地址赋值给checker,也就是将自身内存地址赋值给自身
 */
func (c *copyChecker) check() {
    /**
    因为 copyChecker的底层类型为 uintptr
    那么 这里的 *c其实就是 copyChecker类型本身,然后强转成uintptr
    和拿着 c 也就是copyChecker的指针去求 uintptr,理论上要想等
    即:内存地址为一样,则表示没有被复制
     */
     // 下述做法是:
     // 其实 copyChecker中存储的对象地址就是 copyChecker 对象自身的地址
     // 先把 copyChecker 处存储的对象地址和自己通过 unsafe.Pointer求出来的对象地址作比较,
     // 如果发现不相等,那么就尝试的替换,由于使用的 old是0,
     // 则表示c还没有开辟内存空间,也就是说,只有是首次开辟地址才会替换成功
     // 如果替换不成功,则表示 copyChecker出所存储的地址和 unsafe计算出来的不一致
     // 则表示对象是被复制了
    if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
        !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
        uintptr(*c) != uintptr(unsafe.Pointer(c)) {
        panic("sync.Cond is copied")
    }
}
 
// noCopy可以嵌入到结构中,在第一次使用后不得复制。
//
// 详细介绍请查看: https://github.com/golang/go/issues/8005#issuecomment-190753527
type noCopy struct{}
 
// Lock is a no-op used by -copylocks checker from `go vet`.
// Lock 是有 go vet 命令来判断是否有 copy 的检查的
func (*noCopy) Lock() {}
 
 
 
// sync/runtime.go
// 使用链表实现
type notifyList struct {
 
    wait   uint32       // 等待数
    notify uint32       // 唤醒数
    lock   uintptr      // 信号锁
    // 使用链表实现
    head   unsafe.Pointer   // 队列的当前头
    tail   unsafe.Pointer   // 队列的当前尾
}

参考

sync.cond源码解析
GO 条件锁sync.Cond
Golang标准库深入 - 锁、信号量(sync)
Go 并发实战 -- sync Cond