当前位置: 首页 > Go语言, 代码艺术 > 正文

深入学习Go语言标准库sync

一、前言

最近在阅读Go Web框架echo的源码时,有看到在Echo结构体中有定义了一个sync.Pool类型的pool,处于好奇,加上之前在编写多goroutine并发程序时只使用到了sync包的WaitGroup方法。处于好奇就跟踪进去看了下sync这个包的源码。不看不知道,一看吓一跳,这个包可是非常的强大,它提供了在高并发编程时遇到的各种竞争问题的非常完美的解决方法。

sync包提供了基本的同步基元,如互斥锁。除了Once和WaitGroup类型,大部分都是适用于低水平程序线程,高水平的同步使用channel通信更好一些。那么,今天笔者就结合sync包的源码谈谈在高并发编程中遇到的常见问题及其经典应用。

一、锁接口Locker

Locker接口代表一个可以加锁和解锁的对象。 其封装的接口比较简单,如下定义

type Locker interface {
    Lock()
    Unlock()
}

二、互斥锁Mutex

Mutex是一个互斥锁,可以创建为其他结构体的字段,零值为解锁状态,一般使用后不要复制。Mutex类型的锁和线程无关,可以由不同的线程加锁和解锁。其定义及成员方法如下:

type Mutex struct {
    // 包含隐藏或非导出字段
}

// Lock锁住m,如果m已经被加锁,则Lock将被阻塞,直到m被解锁。 
func (m *Mutex) Lock() 

// Unlock解锁m,如果m未加锁,则该操作会引发panic。锁和线程无关,可以由不同的线程加锁和解锁。
func (m *Mutex) Unlock()

常见的应用场景就是在涉及到并发竞争性操作的地方先加锁,执行完后再解锁,示例代码如下:

package main

import (
	"fmt"
	"sync"
)

type SafeInt struct {
	sync.Mutex
	Num int
}

func main() {
	waitNum := 10 // 设置等待的个数(继续往下看)

	count := SafeInt{}

	done := make(chan bool)

	for i := 0; i < waitNum; i++ {
		go func(i int) {
			count.Lock() // 加锁,防止其它例程修改 count
			count.Num = count.Num + i
			fmt.Print(count.Num, " ")
			count.Unlock()

			done <- true
		}(i)
	}

	for i := 0; i < waitNum; i++ {
		<-done
	}
}

关于互斥锁的Golang实现可以结合其源码和参考这篇文章《Golang 互斥锁内部实现》,这里不再深入讨论。

三、读写互斥锁RWMutex

RWMutex是一个读写互斥锁。该锁可以被同时多个读取者持有或唯一个写入者持有。RWMutex可以创建为其他结构体的字段;零值为解锁状态。RWMutex类型的锁也和线程无关,可以由不同的线程加读取锁/写入和解读取锁/写入锁。其定义及成员方法如下:

type RWMutex struct {
    // 包含隐藏或非导出字段
}

// Lock 将 rw 设置为写锁定状态,禁止其他例程读取或写入。
func (rw *RWMutex) Lock()

// Unlock 解除 rw 的写锁定状态,如果 rw 未被写锁定,则该操作会引发 panic。
func (rw *RWMutex) Unlock()

// RLock 将 rw 设置为读锁定状态,禁止其他例程写入,但可以读取。
func (rw *RWMutex) RLock()

// Runlock 解除 rw 的读锁定状态,如果 rw 未被读锁顶,则该操作会引发 panic。
func (rw *RWMutex) RUnlock()

// RLocker 返回一个互斥锁,将 rw.RLock 和 rw.RUnlock 封装成了一个 Locker 接口。
func (rw *RWMutex) RLocker() Locker

这个类似于操作系统的文件锁(当然操作系统的文件锁相对更加复杂,详细可参考:Linux 2.6 中的文件锁),当然Golang实际上只是实现了这种读写锁的思考,并满足了多数的项目开发场景。

package main

import (
	"fmt"
	"sync"
	"time"
)

var m *sync.RWMutex
var wg sync.WaitGroup

func main() {
	m = new(sync.RWMutex)
	wg.Add(2)
	go write(1)
	time.Sleep(1 * time.Second)
	go read(2)
	wg.Wait()
}
func write(i int) {
	fmt.Println(i, "写开始.")
	m.Lock()
	fmt.Println(i, "正在写入中......")
	time.Sleep(3 * time.Second)
	m.Unlock()
	fmt.Println(i, "写入结束.")
	wg.Done()
}
func read(i int) {
	fmt.Println(i, "读开始.")
	m.RLock()
	fmt.Println(i, "正在读取中......")
	time.Sleep(1 * time.Second)
	m.RUnlock()
	fmt.Println(i, "读取结束.")
	wg.Done()
}

上面的代码中,我们在read里使用读锁,也就是RLockRUnlock,写锁的方法名和我们平时使用的一样LockUnlock,这样,我们就使用了读写锁,可以并发的读,但是同时只能有一个写,并且写的时候不能进行读操作。

四、单次执行Once

Once是只执行一次动作的对象。其使用也比较简单,定义及方法如下:

type Once struct {
    // 包含隐藏或非导出字段
}

// 多次调用仅执行一次指定的函数 f
func (o *Once) Do(f func())

如果once.Do(f)被多次调用,只会在第一次调用才会执行f,即使f每次调用Do 提供的f值不同。如果要执行其它函数,则需要再创建一个新的 Once 对象。使用这种特性很容易实现一个线程安全的单利模式,示例代码如下:

package singleton

import (
    "sync"
)

type singleton struct {
}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}

继续继续跟进Do的源码实现的话,它底层就是原子操作相关的东西了,所以直接使用sync包的原子操作也可以单例模式,具体分析可以参考:《Singleton Pattern in Go》 ,下面给个项目中实际应用的单例封装库:

package msingleton

import (
	"sync"
	"sync/atomic"
)

type SingletonInitFunc func() (interface{}, error)

// Interface for accessing singleton objects.
//
// Example use:
// var configSelectorSingleton = NewSingleton(init)
// func configSelector() (configSelector, error) {
//     s, err := configSelectorSingleton.Get()
//     if err != nil {
//         return nil, err
//     }
//     return s.(configSelector), nil
// }
type Singleton interface {
	// Return the encapsulated singleton
	Get() (interface{}, error)
}

// Call to create a new singleton that is instantiated with the given init function.
// init is not called until the first invocation of Get().  If init errors, it will be called again
// on the next invocation of Get().
func NewSingleton(init SingletonInitFunc) Singleton {
	return &singletonImpl{init: init}
}

type singletonImpl struct {
	sync.Mutex

	// The actual singleton object
	data interface{}
	// Constructor for the singleton object
	init SingletonInitFunc
	// Non-zero if init was run without error
	initialized int32
}

func (s *singletonImpl) Get() (interface{}, error) {
	// Don't lock in the common case
	if atomic.LoadInt32(&s.initialized) > 0 {
		return s.data, nil
	}

	s.Lock()
	defer s.Unlock()

	if atomic.LoadInt32(&s.initialized) > 0 {
		return s.data, nil
	}

	var err error
	s.data, err = s.init()
	if err != nil {
		return nil, err
	}

	atomic.StoreInt32(&s.initialized, 1)
	return s.data, nil
}

五、条件等待Cond

type Cond struct {
    // 在观测或更改条件时L会冻结
    L Locker
    // 包含隐藏或非导出字段
}

Cond实现了一个条件变量,一个线程集合地,供线程等待或者宣布某事件的发生。每个Cond实例都有一个相关的锁(一般是*Mutex或*RWMutex类型的值),它必须在改变条件时或者调用Wait方法时保持锁定。Cond可以创建为其他结构体的字段,开始使用后也不能被拷贝。

条件等待通过Wait让例程等待,通过Signal让一个等待的例程继续,通过Broadcase让所有等待的继续。

在Wait之前需要手动为c.L上锁, Wait结束了手动解锁。为避免虚假唤醒, 需要将Wait放到一个条件判断的循环中,官方要求写法:

c.L.Lock()
for !condition() {
    c.Wait()
}
// 执行条件满足之后的动作...
c.L.Unlock()

Cond的方法集定义如下:

// 创建一个条件等待
func NewCond(l Locker) *Cond

// Broadcast 唤醒所有等待的 Wait,建议在“更改条件”时锁定 c.L,更改完毕再解锁。
func (c *Cond) Broadcast()

// Signal 唤醒一个等待的 Wait,建议在“更改条件”时锁定 c.L,更改完毕再解锁。
func (c *Cond) Signal()

// Wait 会解锁 c.L 并进入等待状态,在被唤醒时,会重新锁定 c.L
func (c *Cond) Wait()

简单的示例代码如下:

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	condition := false // 条件不满足

	var mu sync.Mutex
	cond := sync.NewCond(&mu) // 创建一个Cond

	//让协程去创造条件
	go func() {
		mu.Lock()
		condition = true // 改写条件
		time.Sleep(3 * time.Second)
		cond.Signal() // 发送通知:条件ok
		mu.Unlock()
	}()

	mu.Lock()

	// 检查条件是否满足,避免虚假通知,同时避免 Signal 提前于 Wait 执行。
	for !condition { // 如果Signal提前执行了,那么此处就是false了

		// 等待条件满足的通知,如果虚假通知,则继续循环等待
		cond.Wait() // 等待时 mu 处于解锁状态,唤醒时重新锁定。 (阻塞当前线程)

	}
	fmt.Println("条件满足,开始后续动作...")
	mu.Unlock()

}

六、组等待WaitGroup

WaitGroup用于等待一组线程的结束。父线程调用Add方法来设定应等待的线程的数量。每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束(计数器归零)。这在并发编程中使用的比较常见,接口和方法集定义如下:

type WaitGroup struct {
    // 包含隐藏或非导出字段
}

// 计数器增加 delta,delta 可以是负数。如果计数器为0,Wait阻塞等待的所有线程都会释放,如果计数器小于0,则会panic。
func (wg *WaitGroup) Add(delta int)

// 计数器减少 1
func (wg *WaitGroup) Done()

// 等待直到计数器归零。如果计数器小于 0,则该操作会引发 panic。
func (wg *WaitGroup) Wait()

示例代码如下:

package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func(i int) {
			defer wg.Done()
			fmt.Print(i, " ")
		}(i)
	}
	wg.Wait()
}

七、临时对象池Pool

最后终于讲到文章开头提到的sync.Pool了。Pool是一个可以分别存取的临时对象集,可以安全的被多个线程同时使用。

Pool中保存的任何item都可能随时不做通告的释放掉。如果Pool持有该对象的唯一引用,这个item就可能被回收。

Pool的目的是缓存申请但未使用的item用于之后的重用,以减轻GC的压力。也就是说,让创建高效而线程安全的空闲列表更容易。但Pool并不适用于所有空闲列表。Echo框架中的路由也正是使用了sync.Pool来重复利用内存以达到几乎零内存占用。

在从 Pool 中取出对象时,如果 Pool 中没有对象,将返回 nil,但是如果给 Pool.New 字段指定了一个函数的话,Pool 将使用该函数创建一个新对象返回。其定义及方法集定义如下:

type Pool struct {
   noCopy noCopy // 锁

   local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
   localSize uintptr        // size of the local array

   // 可选参数New指定一个函数在Get方法可能返回nil时来生成一个值。
   // 该参数不能在调用Get方法时被修改
   New func() interface{}
}

// Get方法从池中选择任意一个item,删除其在池中的引用计数,并提供给调用者。
// 假使Get方法没有取得item:如p.New非nil,Get返回调用p.New的结果;否则返回nil。
func (p *Pool) Get() interface{}

// Put方法将x放入池中。
func (p *Pool) Put(x interface{})

关于Pool内部实现,可以参考源码和这篇文章:《Golang标准库深入锁、信号量(sync中的部分章节,暂不深入分析。这里结合echo源码,讲讲Pool使用方式,及其需要注意的地方:

在echo.go源码的New()方法中,对pool的示例化如下(它新建了echo的context接口):

// New creates an instance of Echo.
func New() (e *Echo) {
	e = &Echo{...}
        ...
	e.pool.New = func() interface{} {
		return e.NewContext(nil, nil)
	}
	...
	return
}

在echo中,对pool对象的使用进行了一层封装,定义了两个方法用于过去和释放上面NewContext实例化出来的echo context对象,具体定义如下:

// 从Pool池中获取echo context对象
// 使用完之后需要ReleaseContext()释放掉
func (e *Echo) AcquireContext() Context {
	return e.pool.Get().(Context)
}

// 释放echo Context实例以返回到pool中
// 这个在获取AcquireContext()之后调用
func (e *Echo) ReleaseContext(c Context) {
	e.pool.Put(c)
}

但是,有点奇怪的是,在echo源码中定义的这两个方法,在ServeHTTP实例运行时却并没有调用,搜索源码只是看到echo_test.go 中才用到,真正的ServeHTTP中的使用确实如下的原始调用形式:

// Acquire context
c := e.pool.Get().(*context)
c.Reset(r, w)

难道仅仅是为了测试才封装这两个方法么?看起来echo的作者有点嗨皮了😅。

参考资料:

Golang Package Sync:https://studygolang.com/pkgdoc

Golang标准库深入 – 锁、信号量(sync):https://my.oschina.net/90design/blog/1814499

Go语言实战笔记(十七)| Go 读写锁:https://www.flysnow.org/2017/05/03/go-in-action-go-read-write-lock.html



这篇博文由 s0nnet 于2018年09月17日发表在 Go语言, 代码艺术 分类下, 通告目前不可用,你可以至底部留下评论。
如无特别说明,计算机技术分享发表的文章均为原创,欢迎大家转载,转载请注明: 深入学习Go语言标准库sync | 计算机技术分享
关键字:

深入学习Go语言标准库sync:等您坐沙发呢!

发表评论

快捷键:Ctrl+Enter