golang中sync.Mutex的实现方法
来到golang学习网的大家,相信都是编程学习爱好者,希望在这里学习Golang相关编程知识。下面本篇文章就来带大家聊聊《golang中sync.Mutex的实现方法》,介绍一下sync.Mutex,希望对大家的知识积累有所帮助,助力实战开发!
mutex 的实现思想
mutex 主要有两个 method: Lock()
和 Unlock()
Lock()
可以通过一个 CAS 操作来实现
func (m *Mutex) Lock() { for !atomic.CompareAndSwapUint32(&m.locked, 0, 1) { } } func (m *Mutex) Unlock() { atomic.StoreUint32(&m.locked, 0) }
Lock() 一直进行 CAS 操作,比较耗 CPU。因此带来了一个优化:如果协程在一段时间内抢不到锁,可以把该协程挂到一个等待队列上,Unlock()
的一方除了更新锁的状态,还需要从等待队列中唤醒一个协程。
但是这个优化会存在一个问题,如果一个协程从等待队列中唤醒后再次抢锁时,锁已经被一个新来的协程抢走了,它就只能再次被挂到等待队列中,接着再被唤醒,但又可能抢锁失败...... 这个悲催的协程可能会一直抢不到锁,由此产生饥饿 (starvation) 现象。
饥饿现象会导致尾部延迟 (Tail Latency) 特别高。什么是尾部延迟?用一句话来说就是:最慢的特别慢!
如果共有 1000 个协程,假设 999 个协程可以在 1ms 内抢到锁,虽然平均时间才 2ms,但是最慢的那个协程却需要 1s 才抢到锁,这就是尾部延迟。
golang 中 mutex 的实现思想
➜ go version go version go1.16.5 darwin/arm64
本次阅读的 go 源码版本为 go1.16.5。
golang 标准库里的 mutex 避免了饥饿现象的发生,先大致介绍一下 golang 的加锁和解锁流程,对后面的源码阅读有帮助。
锁有两种 mode,分别是 normal mode 和 starvation mode。初始为 normal mode,当一个协程来抢锁时,依旧是做 CAS 操作,如果成功了,就直接返回,如果没有抢到锁,它会做一定次数的自旋操作,等待锁被释放,在自旋操作结束后,如果锁依旧没有被释放,那么这个协程就会被放到等待队列中。如果一个处于等待队列中的协程一直都没有抢到锁,mutex 就会从 normal mode 变成 starvation mode,在 starvation mode 下,当有协程释放锁时,这个锁会被直接交给等待队列中的协程,从而避免产生饥饿线程。
除此之外,golang 还有一点小优化,当有协程正在自旋抢锁时,Unlock()
的一方不会从等待队列中唤醒协程,因为即使唤醒了,被唤醒的协程也抢不过正在自旋的协程。
下面正式开始阅读源码。
mutex 的结构以及一些 const 常量值
type Mutex struct { state int32 sema uint32 }
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota // 3 // Mutex fairness. // // Mutex can be in 2 modes of operations: normal and starvation. // In normal mode waiters are queued in FIFO order, but a woken up waiter // does not own the mutex and competes with new arriving goroutines over // the ownership. New arriving goroutines have an advantage -- they are // already running on CPU and there can be lots of them, so a woken up // waiter has good chances of losing. In such case it is queued at front // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, // it switches mutex to the starvation mode. // // In starvation mode ownership of the mutex is directly handed off from // the unlocking goroutine to the waiter at the front of the queue. // New arriving goroutines don't try to acquire the mutex even if it appears // to be unlocked, and don't try to spin. Instead they queue themselves at // the tail of the wait queue. // // If a waiter receives ownership of the mutex and sees that either // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, // it switches mutex back to normal operation mode. // // Normal mode has considerably better performance as a goroutine can acquire // a mutex several times in a row even if there are blocked waiters. // Starvation mode is important to prevent pathological cases of tail latency. starvationThresholdNs = 1e6 )
mutex 的状态是通过 state
来维护的,state
有 32 个 bit。
前面 29 个 bit 用来记录当前等待队列中有多少个协程在等待,将等待队列的协程数量记录为 waiterCount。
state >> mutexWaiterShift // mutexWaiterShift 的值为 3
第 30 个 bit 表示当前 mutex 是否处于 starvation mode,将这个 bit 记为 starvationFlag。
state & mutexStarving
第 31 个 bit 表示当前是否有协程正在 (第一次) 自旋,将这个 bit 记为 wokenFlag,woken 的意思也就是醒着,代表它不在等待队列上睡眠。
state & mutexWoken
第 32 个 bit 表示当前锁是否被锁了 (感觉有点绕口哈哈) ,将这个 bit 记为 lockFlag。
state & mutexLocked
用一个图来表示这些 bit
0 0 0 0 0 0 0 0 ... 0 0 0 0 | | | waiterCount starvationFlag wokenFlag lockFlag
sema
是一个信号量,它会被用来关联一个等待队列。
分别讨论几种 case 下,代码的执行情况。
Mutex 没有被锁住,第一个协程来拿锁
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // ... return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() }
在 Mutex 没有被锁住时,state 的值为 0,此时第一个协程来拿锁时,由于 state 的值为 0,因此 CAS 操作会成功,CAS 操作之后的 state 的值变成 1 (lockFlag = 1) ,然后 return 掉,不会进入到 m.lockSlow()
里面。
Mutex 仅被协程 A 锁住,没有其他协程抢锁,协程 A 释放锁
func (m *Mutex) Unlock() { // ... // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } }
紧接上面,state 的值为 1,AddInt32(m.state,-1)
之后,state 的值变成了 0 (lockFlag = 0) ,new 的值为 0,然后就返回了。
Mutex 已经被协程 A 锁住,协程 B 来拿锁
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // ... return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() }
因为 state 的值不为 0,CompareAndSwapInt32 会返回 false,所以会进入到 lockSlow() 里面
lockSlow()
首先看一下 lockSlow() 这个方法的全貌
func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexwokenFlag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 starvationThresholdNs old = m.state if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
第一步: doSpin (空转)
进入 for 循环后,会执行一个判断
for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexwokenFlag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } // ... }
runtime_canSpin(iter)
的作用是根据 iter 的值判断自否应该自旋下去。 (这个方法的实现可以在后面看到)
最初的几次判断,由于 iter 的值为 0,runtime_canSpin(iter) 会返回 true。因此
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter)
这个判断会一直通过,由于 old>>mutexWaiterShift = 0
(waiterCount = 0) ,不满足第二个判断的条件,因此不会执行 CAS 操作和 awoke = true
。
接着就是执行 runtime_doSpin()
了,runtime_doSpin()
会进行一些空循环,消耗了一下 CPU 时间,然后就通过 continue
进入到下一次循环了。 (runtime_doSpin
具体实现也可以在后面看到)
看到看到,这段代码不是用来抢锁的,而是用来等锁变成 unlock 状态的,它会空转一定的次数,期待在空转的过程中,锁被其他的协程释放。
runtime_doSpin()
// src/runtime/lock_sema.go const active_spin_cnt = 30 //go:linkname sync_runtime_doSpin sync.runtime_doSpin //go:nosplit func sync_runtime_doSpin() { procyield(active_spin_cnt) }
# /src/runtime/asm_amd64.s TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
procyield()
会循环执行 PAUSE
指令。
runtime_canSpin()
runtime_canSpin() 的实现在 src/runtime/proc.go 里面,里面的判断比较多,但是我们只需要关注 i >= active_spin
这一个判断就行。
const active_spin = 4 // Active spinning for sync.Mutex. //go:linkname sync_runtime_canSpin sync.runtime_canSpin //go:nosplit func sync_runtime_canSpin(i int) bool { // sync.Mutex is cooperative, so we are conservative with spinning. // Spin only few times and only if running on a multicore machine and // GOMAXPROCS>1 and there is at least one other running P and local runq is empty. // As opposed to runtime mutex we don't do passive spinning here, // because there can be work on global runq or on other Ps. if i >= active_spin || ncpu一个小插曲
在利用断点来 debug 时,发现没办法 watch sync_runtime_canSpin() 内引用的一些全局变量,例如
active_spin
,ncpu
,sched.npidle
这些,所以我就大力出奇迹,强行修改源码在里面声明了几个局部变量,这下可以通过 watch 局部变量来得知全局变量的值了 (机智如我哈哈) 。func sync_runtime_canSpin(i int) bool { local_active_spin := active_spin local_ncpu := ncpu local_gomaxprocs := gomaxprocs npidle := sched.npidle nmspinning := sched.nmspinning if i >= local_active_spin || local_ncpu第二步: 根据旧状态来计算新状态
new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1这一段代码,是根据 old state 来计算 new state,有 4 个操作
- set lockFlag:
new |= mutexLocked
- 增加 waiterCount:
new += 1
- set starvationFlag:
new |= mutexStarving
- clear wokenFlag:
new &^= mutexWoken
由于在这里我们只讨论 ”Mutex 已经被协程 A 锁住,协程 B 来拿锁“ 这种情况,可以分为两种 case
- case1: 在第一步自旋的过程中,锁已经被释放了,此时 old state =
000000...000
(所有 bit 都为 0) ,经过这四个操作的洗礼后,lockFlag 被设置成了 1。 - case2: 在第一步自旋结束后,锁还没有被释放,即 old state 此时为
00000000...001
(仅 lockFlag 为 1),经过这四个操作的洗礼后,waiterCounter = 1,lockFlag 也为 1。
第三步: 更新 state (抢锁)
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // ... } else { old = m.state }
这一步会通过 CAS 操作将 mutex.state
更新为我们刚刚计算得到的 new state
。如果 CAS 成功,且 old 处于未上锁的状态时,就直接利用 break 退出循环返回了 (也就是上面的 case1) 。如果 CAS 失败,将会更新 old state 的值,进行下一次循环,再重复一二三步;
如果是 case2 的话,情况会稍微复杂一点
if atomic.CompareAndSwapInt32(&m.state, old, new) { // ... // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) // ... }
主要就是通过 runtime_SemacquireMutex()
把自己放进了等待队列里面,之后 runtime 不会再调度该协程,直到协程被唤醒。
关于 runtime_SemacquireMutex()
的实现,我暂时就不追究下去了,再追究下去就没完没了啦。
Mutex 被协程 A 锁住,协程 B 来抢锁但失败被放入等待队列,此时协程 A 释放锁
func (m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } }
紧接上回,最初 state 的值为 00000000000...0001001
(waiterCount = 1, lockFlag = 1)。执行完 AddInt32(&m.state, -mutexLocked)
后,变成了 0000...001000
(waiterCount = 1) ,new
的值也为 0000...001000
,接着就进入到 unlockSlow
里面了。
unlockSlow()
看看 unlockSlow()
的全貌
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1此时 old >> mutexWaiterShift =
0000...0001
≠ 0, 所以不会直接返回old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1接着计算 new =
0000...1000
-0000...1000
=0000...0000
,waiterCount 由 1 变成了 0。之后进行 CAS 操作,如果 CAS 成功,则从等待队列中唤醒一个 goroutine。Mutex 被协程 A 锁住,协程 B 来抢锁但失败被放入等待队列,此时协程 A 释放锁,协程 B 被唤醒
让我们会视线切到
lockSlow
的后半截。const starvationThresholdNs = 1e6 if atomic.CompareAndSwapInt32(&m.state, old, new) { // ... runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // ... iter = 0 }当协程 B 从
runtime_SemacquireMutex
处醒来后,会根据该协程的等待的时间来判断是否饥饿了。这里我们先假设此时还没有饥饿,后面会详细讨论饥饿时的情况。之后会将iter
重置为 0,接着就进行下一次的循环了,由于iter
已经被重置为 0 了,所以在下一次循环时,sync_runtime_doSpin(iter)
会返回true
。由于此时 state 已经变成了 0 了,所以在下一次循环里可以畅通无阻的拿到锁。
饥饿情况下的解锁行为: starvationFlag 的作用
设想这样一种情况:goroutine A 拿到锁,goroutine B 抢锁失败,被放入等待队列。goroutine A 释放锁,goroutine B 被唤醒,但是正当它抢锁时,锁被新来的 goroutine C 抢走了... 连续好几次,每当 goroutine B 要抢锁时,锁都被其他协程抢先一步拿走。直到某一次,goroutine B 再次被唤醒后执行
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs它就进入饥饿模式 (starvation mode) 啦!
// The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. if starving && old&mutexLocked != 0 { new |= mutexStarving }之后通过 CAS 操作将饥饿标志设置到了
mutex.state
里面,然后它就又被放到等待队列中了。atomic.CompareAndSwapInt32(&m.state, old, new)Unlock()
视角切换到 Unlock() 这一边
func (m *Mutex) Unlock() { // ... // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { // ... if new&mutexStarving == 0 { // ... for { // ... if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } // ... } } else { // Starving mode: handoff mutex ownership to the next waiter, and yield // our time slice so that the next waiter can start to run immediately. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. runtime_Semrelease(&m.sema, true, 1) } }在
unlockSlow()
中,此时new&mutexStarving != 0
,所以会直接进入到 else 分支内,调用runtime_Semrelease()
方法,但要注意 else 分支内runtime_Semrelease()
的参数和 if 分支的参数不一样,在这里runtime_Semrelease(&m.sema, true, 1)
起到的作用是唤醒了等待队列中的第一个协程并立马调度该协程 (runtime_Semrelease()
方法的详解在后面 )。同时正如注释所说,在
Unlock()
中由于进行了atomic.AddInt32(&m.state, -mutexLocked)
操作,所以 mutex.state 的 lockFlag 是为 0 的,但是没关系,starvationFlag 是为 1 的,所以会依旧被认为是锁住的状态。Lock()
func (m *Mutex) Lock() { // ... m.lockSlow() } func (m *Mutex) lockSlow() { // ... for { // ... if atomic.CompareAndSwapInt32(&m.state, old, new) { // ... runtime_SemacquireMutex(&m.sema, queueLifo, 1) // ... old = m.state if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. // ... delta := int32(mutexLocked - 1>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } // ... }视角再次切换到
Lock()
这边,饥饿的 goroutine 被唤醒并调度后,首先执行old = m.state
, 此时 old 的 starvationFlag = 1。之后就正如注释所说,它会尝试修复 mutex.state 的"不一致" (inconsistent) 状态。
修复工作主要做了三件事情:
在 starvation mode 下的 Unlock() 没有将 waitterCount - 1, 所以这里需要给 mutexWaiter 减 1
将 state 的 locked flag 置为 1
如果该 goroutine 没有饥饿或者是等待队列中的最后一个 goroutine 的话,清理 starvationFlag
这三件事情通过 atomic.AddInt32(&m.state, delta)
一步到位。
runtime_Semrelease()
// Semrelease atomically increments *s and notifies a waiting goroutine // if one is blocked in Semacquire. // It is intended as a simple wakeup primitive for use by the synchronization // library and should not be used directly. // If handoff is true, pass count directly to the first waiter. // skipframes is the number of frames to omit during tracing, counting from // runtime_Semrelease's caller. func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
handoff 就是传球的意思,handoff 为 false 时,仅仅唤醒等待队列中第一个协程,但是不会立马调度该协程;当 handoff 为 true 时,会立马调度被唤醒的协程,此外,当 handoff = true 时,被唤醒的协程会继承当前协程的时间片。具体例子,假设每个 goroutine 的时间片为 2ms,gorounte A 已经执行了 1ms,假设它通过 runtime_Semrelease(handoff = true) 唤醒了 goroutine B,则 goroutine B 剩余的时间片为 2 - 1 = 1ms。
饥饿模式下新来的 goroutine 的加锁行为: starvationFlag 的作用
如果在饥饿模式下,有新的 goroutine 来请求锁,它会执行下面这些步骤
func (m *Mutex) lockSlow() { // ... old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // ... runtime_doSpin() } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1由于
old&(mutexLocked|mutexStarving) != mutexLocked
,所以它不会自旋。由于
old&mutexStarving != 0
,所以它不会 set lockFlag。由于
old&(mutexLocked|mutexStarving) != 0
,所以它会 增加 waiterCount。可以看到,它实际上就做了增加
waiterCount
这一个操作,之后通过 CAS 更新 state 的状态,更新完成之后就跑去等待队列睡觉去了。因此在饥饿状态下,新的来争抢锁的 goroutine 是不会去抢锁 (set lockFlag) 的,它们只会登记一下 (waiterCount + 1) ,然后乖乖加入到等待队列里面。
当有协程正在自旋时的解锁行为: wokenFlag 的作用
wokenFlag 是在 lockSlow() 里面被设置的,wokenFlag 为 1 时,表示此时有协程正在进行自旋。
func (m *Mutex) lockSlow() { starving := false awoke := false iter := 0 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexwokenFlag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } // ... if atomic.CompareAndSwapInt32(&m.state, old, new) { // ... runtime_SemacquireMutex(&m.sema, queueLifo, 1) // ... awoke = true iter = 0 } // ... } // ... }当一个新来的协程 (从未被放到等待队列中) 在第一次自旋时,wokenFlag 的设置逻辑为:
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true }但是当协程从等待队列中被唤醒后自旋时,却
lockSlow()
找不到设置 wokenFlag 的逻辑,为何?因为这段逻辑被放到了unlockSlow
里面了。视线切换到
unlockSlow()
那一边func (m *Mutex) unlockSlow(new int32) { // ... if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { // 当 mutexwokenFlag 被设置时,会直接 return // 不会去等待队列唤醒 goroutine return } // Grab the right to wake someone. // 这个地方会设置 wokenFlag 哦 new = (old - 1可以看到,当有协程正在自旋时 (wokenFlag = 1) ,不会从等待队列唤醒协程,这样就避免了等待队列上的协程加入竞争,当然,正在自旋中的协程之间彼此之间还是会竞争的;如果 wokenFlag = 0,则会从等待队列中唤醒一个协程,在唤醒之前会将 wokenFlag 设置为 1,这样协程被唤醒后就不用再去设置 wokenFlag 了,妙呀!
为什么当有协程在自旋时,不要去等待队列中唤醒协程呢?协程从被唤醒到被调度 (在 CPU 上面执行) 是要花时间的,等真正自旋时 mutex 早就被抢走了。
协程从等待队列被唤醒后如果还是没有抢到锁,会被放到队列首部还是尾部?
但是是头部,代码如下:
// If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1)复杂情景分析
基于上面的逻辑来分析一下复杂的逻辑吧!
假设有协程 g1,g2,g3,g4,g5,g6, 共同争抢一把锁 m
一开始 g1 拿到锁
owner: g1 waitqueue: null
g2 开始抢锁,没有抢到,被放到等待队列
owner: g1 waitqueue: g2
g1 释放锁,g2 从等待队列中被唤醒
owner: null waitqueue: null
此时 g3 也开始抢锁,g2 没有抢过,又被放回等待队列
owner: g3 waitqueue: g2
g4 开始抢锁,没有抢到,被放到等待队列
owner: g3 waitqueue: g2, g4
g3 释放锁,g2 被唤醒
owner: null waitqueue: g4
此时 g5 开始抢锁,g2 没有抢过,又被放回等待队列首部
owner: g5 waitqueue: g2, g4
g6 开始抢锁,正在自旋中
owner: g5 waitqueue: g2, g4 wokenFlag: 1 spinning: g6
g5 释放锁,由于此时有协程正在自旋,因此不会去等待队列中唤醒协程,锁被 g6 轻松抢到
owner: g6 waitqueue: g2, g4 wokenFlag: 0 spinning: null
g6 释放锁,g2 被唤醒,此时 g7 开始抢锁,g2 没有抢过,又被放回等待队列首部,但是 g2 由于太久没有抢到锁,进入饥饿模式了
owner: g7 waitqueue: g2(饥饿), g4 starvationFlag: 1
g8 来抢锁,由于处于饥饿状态,g8 会被直接放在等待队列尾部
owner: g7 waitqueue: g2(饥饿), g4, g8 starvationFlag: 1
g7 释放锁,由于处于饥饿状态,会直接唤醒 g2 并调度它
owner: g2 waitqueue: g4, g8 starvationFlag: 1
g2 执行完毕,释放锁,注意此刻依旧是饥饿状态,直接调度 g4,g4 苏醒后,发现它自己没有饥饿,于是 clear starvationFlag
owner: g4 waitqueue: g8 starvationFlag: 0
此时新来的 g8 可以正常加入到对锁的争抢中了,之后就是正常的加锁解锁逻辑了。
一点小瑕疵: 一种很边缘的 starvation case
由于等待队列中的协程只有当被唤醒之后才会根据等待时间来判断是否进入 starvation mode,因此会存在一个协程在等待队列中等待了很久,它实际上已经饥饿了,但是一直没被唤醒过,就没机会 set starvationFlag,这就会导致饥饿现象的发生。
那么会存在等待队列里的协程一直不被唤醒的情况么?
有的!在
unlockSlow()
时如果 wokenFlag = 1,那就不会去唤醒等待队列中的线程。就会存在这样一种情况,假设每次Unlock()
时恰好有一个新来的协程在自旋,那等待队列中的协程就会永远饥饿下去!reference
Tail Latency Might Matter More Than You Think
终于介绍完啦!小伙伴们,这篇关于《golang中sync.Mutex的实现方法》的介绍应该让你收获多多了吧!欢迎大家收藏或分享给更多需要学习的朋友吧~golang学习网公众号也会发布Golang相关知识,快来关注吧!

- 上一篇
- golang操作rocketmq的示例代码

- 下一篇
- victoriaMetrics库布隆过滤器初始化及使用详解
-
- 舒服的天空
- 这篇博文出现的刚刚好,楼主加油!
- 2023-06-09 19:09:38
-
- 凶狠的老鼠
- 很详细,已加入收藏夹了,感谢师傅的这篇技术贴,我会继续支持!
- 2023-05-18 08:33:07
-
- 奋斗的小刺猬
- 这篇技术文章出现的刚刚好,太细致了,写的不错,码起来,关注作者大大了!希望作者大大能多写Golang相关的文章。
- 2023-04-28 00:51:00
-
- 多情的唇膏
- 很好,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,帮助很大,总算是懂了,感谢师傅分享技术贴!
- 2023-03-13 02:36:48
-
- 悲凉的香烟
- 这篇文章内容出现的刚刚好,太详细了,感谢大佬分享,收藏了,关注作者了!希望作者能多写Golang相关的文章。
- 2023-02-16 00:15:42
-
- 殷勤的鸡
- 太给力了,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,帮助很大,总算是懂了,感谢作者大大分享博文!
- 2023-01-28 14:19:39
-
- 欣慰的鼠标
- 这篇文章真及时,好细啊,感谢大佬分享,码住,关注作者了!希望作者能多写Golang相关的文章。
- 2023-01-22 06:14:52
-
- Golang · Go教程 | 2小时前 |
- DebianOpenSSL安装失败的终极解决方案
- 501浏览 收藏
-
- Golang · Go教程 | 3小时前 |
- Debian数据快速提取技巧
- 216浏览 收藏
-
- Golang · Go教程 | 6小时前 |
- Debian系统JS依赖管理终极攻略
- 218浏览 收藏
-
- Golang · Go教程 | 7小时前 |
- Debian上Hadoop作业调度实用技巧
- 100浏览 收藏
-
- Golang · Go教程 | 8小时前 |
- Go语言闭包误区与匿名函数深度解析
- 222浏览 收藏
-
- Golang · Go教程 | 8小时前 |
- Debian系统安全回收数据的正确攻略
- 111浏览 收藏
-
- Golang · Go教程 | 10小时前 |
- Debian高效fetch技巧与使用攻略
- 125浏览 收藏
-
- Golang · Go教程 | 16小时前 |
- Debian邮件服务器升级维护攻略
- 474浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 508次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 484次学习
-
- 笔灵AI生成答辩PPT
- 探索笔灵AI生成答辩PPT的强大功能,快速制作高质量答辩PPT。精准内容提取、多样模板匹配、数据可视化、配套自述稿生成,让您的学术和职场展示更加专业与高效。
- 14次使用
-
- 知网AIGC检测服务系统
- 知网AIGC检测服务系统,专注于检测学术文本中的疑似AI生成内容。依托知网海量高质量文献资源,结合先进的“知识增强AIGC检测技术”,系统能够从语言模式和语义逻辑两方面精准识别AI生成内容,适用于学术研究、教育和企业领域,确保文本的真实性和原创性。
- 22次使用
-
- AIGC检测-Aibiye
- AIbiye官网推出的AIGC检测服务,专注于检测ChatGPT、Gemini、Claude等AIGC工具生成的文本,帮助用户确保论文的原创性和学术规范。支持txt和doc(x)格式,检测范围为论文正文,提供高准确性和便捷的用户体验。
- 30次使用
-
- 易笔AI论文
- 易笔AI论文平台提供自动写作、格式校对、查重检测等功能,支持多种学术领域的论文生成。价格优惠,界面友好,操作简便,适用于学术研究者、学生及论文辅导机构。
- 39次使用
-
- 笔启AI论文写作平台
- 笔启AI论文写作平台提供多类型论文生成服务,支持多语言写作,满足学术研究者、学生和职场人士的需求。平台采用AI 4.0版本,确保论文质量和原创性,并提供查重保障和隐私保护。
- 35次使用
-
- Golangmap实践及实现原理解析
- 2022-12-28 505浏览
-
- 试了下Golang实现try catch的方法
- 2022-12-27 502浏览
-
- Go语言中Slice常见陷阱与避免方法详解
- 2023-02-25 501浏览
-
- Golang中for循环遍历避坑指南
- 2023-05-12 501浏览
-
- Go语言中的RPC框架原理与应用
- 2023-06-01 501浏览