计算机系统应用教程网站

网站首页 > 技术文章 正文

Mutex互斥锁设计思路详解和源码探究

btikc 2024-10-12 10:46:36 技术文章 7 ℃ 0 评论

序言

Mutex通常用于并发环境下控制多个协程对临界资源的访问,使得并发的协程之间可以互斥地操作同一份资源。这篇文章我们就来理解一下Golang中互斥锁的设计思想以及对互斥锁的实现源码进行探究。

之前发布过一篇简短的解析,感觉不够详细因此重新发一篇文章深入剖析一下。

源码解析

下面是源码中Mutex的定义,其实现了Locker接口。

  // A Mutex is a mutual exclusion lock.
  // The zero value for a Mutex is an unlocked mutex.
  //
  // A Mutex must not be copied after first use.
  type Mutex struct {
      state int32
      sema  uint32
  }

  // A Locker represents an object that can be locked and unlocked.
  type Locker interface {
      Lock()
      Unlock()
  }

  const (
      mutexLocked = 1 << iota // mutex is locked
      mutexWoken
      mutexStarving
      mutexWaiterShift = iota

      // Mutex fairness.
      //
      // Mutex can be in 2 modes of operations: normal and starvation.
      // In normal mode waiters are queued in FIFO order, but a woken up waiter
      // does not own the mutex and competes with new arriving goroutines over
      // the ownership. New arriving goroutines have an advantage -- they are
      // already running on CPU and there can be lots of them, so a woken up
      // waiter has good chances of losing. In such case it is queued at front
      // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
      // it switches mutex to the starvation mode.
      //
      // In starvation mode ownership of the mutex is directly handed off from
      // the unlocking goroutine to the waiter at the front of the queue.
      // New arriving goroutines don't try to acquire the mutex even if it appears
      // to be unlocked, and don't try to spin. Instead they queue themselves at
      // the tail of the wait queue.
      //
      // If a waiter receives ownership of the mutex and sees that either
      // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
      // it switches mutex back to normal operation mode.
      //
      // Normal mode has considerably better performance as a goroutine can acquire
      // a mutex several times in a row even if there are blocked waiters.
      // Starvation mode is important to prevent pathological cases of tail latency.
      starvationThresholdNs = 1e6
  )

源码注释翻译

Mutex是互斥锁。零值状态下是非上锁状态,第一次使用后不允许再被拷贝。
Mutex有两种工作模式:正常模式和饥饿模式。

在正常模式下,所有的 goroutine 会按照先进先出的 顺序等待锁的释放。但是一个刚被唤醒的 goroutine 不会直接获取到锁,而是会和新来的请求锁的 goroutine 去竞争锁。新来的 goroutine 具有一个优势:它正在 CPU 上执行而且可能有好几个新来的 goroutine 同时在请求锁,所以刚刚被唤醒的 goroutine 有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的 goroutine 在竞争锁失败之后会加入到等待队列的最前面。如果一个等待的 goroutine 超过 1ms 后仍然 没有获取锁,那么它将会把锁转变为饥饿模式。

在饥饿模式下,锁的所有权将从执行 unlock 的 goroutine 直接交给等待队列中的第一个goroutine。新来的 goroutine 将不能再去尝试竞争锁,即使锁是 unlock 状态也不会去进行自旋操作,而是进入等待队列的尾部等待被唤醒。

如果一个被唤醒的 goroutine 获取到了锁,并且满足以下其中一个条件,那么他会将锁换为正常工作模式:a. 它是等待队列中的最后一个。b. 它等待的时间小于1ms。

正常模式具有较好的性能,因为一个 goroutine 可以连续多次获取锁,即使还有其他的阻塞等待锁的 goroutine也不需要进入休眠阻塞。饥饿模式对防止尾部延迟的问题是非常重要的。

解读

可以看到golang将Mutex设计成了两种工作模式,默认初始化时是正常模式,这种模式下一个请求锁的groutine将会有很高的效率获取到锁。饥饿模式下会优先将锁给到那些等待时间最久的协程。

为什么这么设计

其实从注释中我们就可以解读出这么设计的原因,那就是为了提高加锁效率。

正常来说,如果是我们自己设计一个锁,必定是先尝试获取锁,如果获取失败则进入等待队列休眠等待锁释放时被唤醒。这样每当锁释放时都要唤醒一个协程。被唤醒的协程等待runtime调度并且协程切换才能继续获取锁,比较耗费性能,如果此时存在正在运行中的请求锁的协程,那就可以让该协程优先获取锁,提高加锁的效率(所谓来的早不如来的巧)。显而易见,这种行为在某些情况下会导致处于等待中的协程长时间获取不到锁的情况,因此需要设计饥饿模式,在该模式下被释放的锁会优先给到饥饿的协程,这样就保证了锁的公平性。

在理解了Mutex的设计思想之后我们再去看源码的具体实现,按照我们上面的思路去逐一分析关键代码。

定义

type Mutex struct {
    state int32 
    sema  uint32
}
const (
    mutexLocked = 1 << iota // 加锁标识位掩码
    mutexWoken // 唤醒标识位掩码
    mutexStarving // 饥饿标识位掩码
    mutexWaiterShift = iota // 休眠等待协程计数起始位的偏移
)

state是个复合变量,用不同bit位标识锁的当前状态。

第0位标识是否被加锁。 m.state&mutexLocked != 0 表示锁被持有。第1位标识是否存在被唤醒的协程。m.state&mutexWoken != 0 表示存在被唤醒的协程。第2位标识锁当前所处的工作模式。m.state&mutesStarving !=0 表示锁处于饥饿模式。3-31位用于记录处于休眠等待的协程的数量。 count = m.state >> mutexWaiterShift。

sema是信号量,用于等待goroutine的唤醒操作。

加锁

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
// 如果Mutex已经处于加锁状态,那么调用Lock函数的协程将会阻塞直到加锁成功。
func (m *Mutex) Lock() {
  // Fast path: grab unlocked mutex.
  // 如果锁是初始化状态则尝试加锁,成功直接返回
  if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      if race.Enabled {
          race.Acquire(unsafe.Pointer(m))
      }
      return
  }
  // Slow path (outlined so that the fast path can be inlined)
  // 否则调用lockSlow去和其他协程竞争锁。
  m.lockSlow()
}

func (m *Mutex) lockSlow() {
  var waitStartTime int64 // 当前协程阻塞等待的开始时间
  starving := false // 当前协程是否饥饿
  awoke := false // 当前协程是否是被唤醒的
  iter := 0 // 当前协程已经自旋的次数
  old := m.state // 获取当前锁的状态
  for { // 用于自旋以及被唤醒后的逻辑处理
      // Don't spin in starvation mode, ownership is handed off to waiters
      // so we won't be able to acquire the mutex anyway.
      // 饥饿模式下是直接将锁交给下一位,所以我们这里是不可以获取锁的,所以只有正常模式可以自旋,饥饿模式直接阻塞。
      // 正常模式在锁没有释放的情况下自旋一段时间,如果期间锁被释放能更快的获取到锁。
      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
          // Active spinning makes sense.
          // Try to set mutexWoken flag to inform Unlock
          // to not wake other blocked goroutines.
          // 自旋时尝试设置唤醒位来告诉Unlock操作不要再唤醒阻塞的协程,因为已经有正在运行的协程了,所以就不要再多余唤醒。(这个实现就是满足我们上面理解的优先将锁给到正在执行中的协程的设计)
          /* 这里需要满足3个条件:
            1. 如果当前协程是被唤醒的或者当前协程已经成功设置了woken位,就没必要再设置了。
            2. 如果已经有被唤醒的协程就没必要再设置了。
            3. 如果没有在阻塞等待的协程也就没必要设置了,因为unlock本身也不会执行唤醒操作。
          */
          if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
              atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
              awoke = true
          }
          runtime_doSpin()
          iter++
          old = m.state
          continue
      }
      
      /* 走到这里就有三种情况
        1. 锁已经被释放,可以竞争锁了
        2. 超出自旋次数限制且锁还没释放,需要排队等待
        3. 锁已经处于饥饿模式,需要排队等待
        1和2两种情况都处在正常模式下,均可以去竞争锁,情况3只能休眠。
      */
      
      new := old
      // Don't try to acquire starving mutex, new arriving goroutines must queue.
      // 非饥饿模式下可以尝试获取锁
      if old&mutexStarving == 0 {
          new |= mutexLocked
      }
      // 饥饿模式下只能进等待队列了
      if old&(mutexLocked|mutexStarving) != 0 {
          new += 1 << mutexWaiterShift
      }
      // The current goroutine switches mutex to starvation mode.
      // But if the mutex is currently unlocked, don't do the switch.
      // Unlock expects that starving mutex has waiters, which will not
      // be true in this case.
      // 如果当前协程已经饥饿且锁还被持有则将锁切换为饥饿模式。
      // 如果当前锁已释放就不要切换,因为饥饿模式下必须要有等待者(因为饥饿模式下解锁时会直接把锁给到等待的协程),这种情况下不一定有等待的协程。
      if starving && old&mutexLocked != 0 {
          new |= mutexStarving
      }
      if awoke {
          // The goroutine has been woken from sleep,
          // so we need to reset the flag in either case.
          if new&mutexWoken == 0 {
              throw("sync: inconsistent mutex state")
          }
          // 如果当前协程是被唤醒的,需要重置唤醒位
          new &^= mutexWoken
      }
      if atomic.CompareAndSwapInt32(&m.state, old, new) { // 状态设置成功
          // 如果当前锁是释放状态且工作在正常模式下则加锁成功。
          if old&(mutexLocked|mutexStarving) == 0 {
              break // locked the mutex with CAS
          }
          // 当前可能状态有:锁处于饥饿模式或者还在被持有中
          // If we were already waiting before, queue at the front of the queue.
          // 如果waitStartTime!=0说明是被唤醒的,则重新进入队列的头部等待下次被唤醒。
          queueLifo := waitStartTime != 0
          if waitStartTime == 0 {
              waitStartTime = runtime_nanotime()
          }
          runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 阻塞排队
          // 判断是否已经饥饿
          starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
          old = m.state
          if old&mutexStarving != 0 {
              // If this goroutine was woken and mutex is in starvation mode,
              // ownership was handed off to us but mutex is in somewhat
              // inconsistent state: mutexLocked is not set and we are still
              // accounted as waiter. Fix that.
              // 如果当前协程被唤醒且Mutex处于饥饿模式,
              // 锁的拥有者将锁交到当前协程但是锁处于不一致的状态:
              // mutexLocked还没有被设置并且当前协程还被算成一个等待者,需要修复状态。
              
              if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                  throw("sync: inconsistent mutex state")
              }
              delta := int32(mutexLocked - 1<<mutexWaiterShift)
              if !starving || old>>mutexWaiterShift == 1 {
                  // Exit starvation mode.
                  // Critical to do it here and consider wait time.
                  // Starvation mode is so inefficient, that two goroutines
                  // can go lock-step infinitely once they switch mutex
                  // to starvation mode.
                  // 如果当前为非饥饿模式或者自己是等待队列里的最后一个,则调整锁模式为正常模式
                  delta -= mutexStarving
              }
              atomic.AddInt32(&m.state, delta)
              break
          }
          // 非饥饿模式下被唤醒后重新竞争锁,没有优势
          awoke = true
          iter = 0
      } else {
          old = m.state // 竞争失败重来一次
      }
  }

  if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
  }
}

快速加锁

atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)用于判断锁是否为初始状态并尝试加锁,如果加锁成功则直接返回,提高加锁效率;如果加锁失败说明锁已被占用,则调用lockSlow去做相应的操作。

自旋

满足下面条件能进入自旋状态:

1. 当前互斥锁的状态是非饥饿状态,并且已经被锁定了。

2. 自旋次数不超过 4 次。

3. cpu 个数大于一,必须要是多核 cpu。

4. 处于空闲状态或自旋状态的procs加上本协程小于gomaxprocs。

5. 当前p的待执行队列为空。

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
        if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
                return false
        }
        if p := getg().m.p.ptr(); !runqempty(p) {
                return false
        }
        return true
}

slowLock总结

  1. 在自旋的过程中会尝试设置 mutexWoken 来通知解锁操作不去唤醒其他已经休眠的协程,在自旋模式下,当前的协程就能更快的获取到锁。
  2. 自旋完成之后,就会去计算当前的锁的状态,如果当前处在饥饿模式下则不会去请求锁。
  3. 状态计算完成之后就会尝试使用 CAS 操作获取锁,如果获取成功就会直接退出循环。
  4. 如果没有获取到就调用 runtime_SemacquireMutex(&m.sema, queueLifo, 1) 方法休眠当前协程。
  5. 协程被唤醒之后会先判断当前是否处在饥饿状态。 1. 如果处在饥饿状态就会获得互斥锁,如果等待队列中只存在当前协程或者当前协程不饥饿(如果当前协程超过等待超过 1ms 还没有获取到锁就会饥饿),会将互斥锁切换成正常模式。2. 如果不是饥饿模式就会设置唤醒和饥饿标记、重置自旋次数并重新执行获取锁的循环。

runtime_SemacquireMutex实现细节

 // Semacquire waits until *s > 0 and then atomically decrements it.
 // It is intended as a simple sleep primitive for use by the synchronization
 // library and should not be used directly.
 // Semacquire 等待直到 *s > 0 并且自动减少*s值,这是专门为同步库设计的简单的睡眠原语,不应该被直接使用。
 func runtime_Semacquire(s *uint32)

 // SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
 // If lifo is true, queue waiter at the head of wait queue.
 // skipframes is the number of frames to omit during tracing, counting from
 // runtime_SemacquireMutex's caller.
 // SemacquireMutex 跟 Semacquire类似,不过是为了互斥锁特殊优化的,
 // 如果lifo为true直接将当前goroutine放在阻塞队列首位。
 func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

加锁流程图

解锁

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
// 解锁一个没有锁定的互斥量会报运行时错误.
// 一个加锁的Mutex并不和特定的协程绑定,允许一个协程对mutex加锁然后安排另一个协程解锁。
func (m *Mutex) Unlock() {
  if race.Enabled {
      _ = m.state
      race.Release(unsafe.Pointer(m))
  }

  // Fast path: drop lock bit.
  new := atomic.AddInt32(&m.state, -mutexLocked)
  if new != 0 { // 如果new等于零表示没有其他任何需要通知的协程。直接返回
      // Outlined slow path to allow inlining the fast path.
      // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
      m.unlockSlow(new)
  }
}

func (m *Mutex) unlockSlow(new int32) {
  // 解锁一个未加锁的Mutex会报错
  if (new+mutexLocked)&mutexLocked == 0 {
      throw("sync: unlock of unlocked mutex")
  }
  
  /*锁释放后的后续处理工作:
    1. 正常模式下唤醒一个等待的协程竞争锁。
    2. 饥饿模式下唤醒等待队列首位的协程获取锁。
  */
  if new&mutexStarving == 0 {
      old := new
      for {
          // If there are no waiters or a goroutine has already
          // been woken or grabbed the lock, no need to wake anyone.
          // In starvation mode ownership is directly handed off from unlocking
          // goroutine to the next waiter. We are not part of this chain,
          // since we did not observe mutexStarving when we unlocked the mutex above.
          // So get off the way.
          // 如果没有等待的协程或者已经有被唤醒的协程或者锁已经被重新抢到或者锁已经变成饥饿模式,则直接退出。
          if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
              return
          }
          // Grab the right to wake someone.
          // 将等待着减一并且设置唤醒位
          new = (old - 1<<mutexWaiterShift) | mutexWoken
          if atomic.CompareAndSwapInt32(&m.state, old, new) {
              // 唤醒等待goroutine
              runtime_Semrelease(&m.sema, false, 1)
              return
          }
          old = m.state
      }
  } else {
      // Starving mode: handoff mutex ownership to the next waiter, and yield
      // our time slice so that the next waiter can start to run immediately.
      // Note: mutexLocked is not set, the waiter will set it after wakeup.
      // But mutex is still considered locked if mutexStarving is set,
      // so new coming goroutines won't acquire it.
      // 饥饿模式下直接唤醒队首的协程并且让出时间片使其可以立即运行
      // 注意:此时mutexLocked还未设置,被唤醒的协程在被唤醒后会进行设置,
      // 但是在饥饿模式下Mutex会仍然被认为在加锁状态,因此新来的goroutine不会获取到锁
      runtime_Semrelease(&m.sema, true, 1)
  }
}

解锁主要做三部分工作:

尝试快速解锁。正常工作模式下尝试唤醒一个等待中的协程去竞争锁(如果已经有别的协程抢到了锁或者已经存在唤醒中的协程或者没有需要等待的协程或者锁被设置成了饥饿模式,则不再进行操作)。饥饿工作模式下直接唤醒第一个等待的协程把锁交给它(饥饿模式下一定有等待加锁的协程)。

runtime_Semrelease详解

  // Semrelease atomically increments *s and notifies a waiting goroutine
  // if one is blocked in Semacquire.
  // It is intended as a simple wakeup primitive for use by the synchronization
  // library and should not be used directly.
  // If handoff is true, pass count directly to the first waiter.
  // skipframes is the number of frames to omit during tracing, counting from
  // runtime_Semrelease's caller.
  // Semrelease 自动增加s的值并且唤醒一个阻塞的goroutine。
  // 如果handoff等于true,直接将count传递给第一个等待者
! func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

饥饿模式解锁

解锁后 state 有 mutexStarving 标识,无 mutexLocked 标识。被唤醒的协程会设置 mutexLocked 状态并将等待的协程数减 1。需要注意的是并不会马上清除 mutexStarving 标识而是一直在饥饿模式下工作,直到遇到第一个符合条件的协程才会清除 mutexStarving 标识。

在饥饿模式下,如果有新的协程来请求抢占锁,mutex 会被认为还处于锁状态,所以新来的协程不会进行抢占锁操作。

解锁流程图:

总结

本文通过对Mutex的设计思想的深入分析和对源码实现的探索全面的讲解了Golang Mutex互斥锁的设计。
总结得出Mutex 通过正常模式和饥饿模式两种模式来平衡锁的效率和公平性。

希望对阅读这篇文章的你对锁的理解有所帮助,如果文章中存在错误之处欢迎指出,我们共同进步。

推荐阅读

Golang实用教程:协程的应用技巧

Golang内存模型

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表