百木园-与人分享,
就是让自己快乐。

我的Go并发之旅、02 基本并发原语

注:本文所有函数名为中文名,并不符合代码规范,仅供读者理解参考。

Goroutine

Go程不是OS线程,也不是绿色线程(语言运行时管理的线程),而是更高级别的抽象,一种特殊的协程。是一种非抢占式的简单并发子goroutine(函数、闭包、方法)。不能被中断,但有多个point可以暂停或重新进入。

goroutine 在它们所创建的相同地址空间内执行,特别是在循环创建go程的时候,推荐将变量显式映射到闭包(引用外部作用域变量的函数)中。

fork-join 并发模型

image-20220918185937609

Fork 在程序中的任意节点,子节支可以与父节点同时运行。join 在将来某个时候这些并发分支会合并在一起,这是保持程序正确性和消除竞争条件的关键Go语言遵循 fork-join并发模型。

使用 go func 其实就是在创建 fork point,为了创建 join point,我们需要解决竞争条件

sync.WaitGroup

func 竞争条件_解决() {
	var wg sync.WaitGroup
	var data int
	wg.Add(1)
	go func() {
		defer wg.Done()
		data++
	}()
	wg.Wait()
	if data == 0 {
		fmt.Println(\"Value\", data)
	} else {
		fmt.Println(\"Value 不是 0\")
	}
}

通过 sync.WaitGroup 我们阻塞 main 直到 go 程退出后再让 main 继续执行,实现了 join point。可以理解为并发-安全计数器,经常配合循环使用。

这是一个同步访问共享内存的例子。使用前提是你不关心并发操作的结果,或者你有其他方法来收集它们的结果。

wg.Add(1) 是在帮助跟踪的goroutine之外完成的,如果放在匿名函数内部,会产生竞争条件。因为你不知道go程什么时候被调度。

sync.Mutex 互斥锁

type state struct {
	lock  sync.Mutex
	count int
}

func 结构体修改状态_互斥锁() {
	s := state{}
	var wg sync.WaitGroup
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			// s.lock.Lock()
			defer wg.Done()
			// defer s.lock.Unlock()
			s.count++
		}()
	}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			// s.lock.Lock()
			defer wg.Done()
			// defer s.lock.Unlock()
			s.count--
		}()
	}
	wg.Wait()
	fmt.Println(s.count)
}

没有互斥锁的时候,会导致发生竞争现象,取消互斥锁的注释,最终结果为理想的0。

进入和退出一个临界区是有消耗的,所以一般人会尽量减少在临界区的时间。

sync.RWMutex 读写锁

本质和普通的互斥锁相同,但是可以保证在未锁的情况允许多个读消费者持有一个读锁,在读消费者非常多的情况下可以提高性能。

在多个读消费者的情况下,通常使用 RWMutex ,读消费者较少时,Mutex和RWMutex两者都可用。

Cond 同步多个go程

cond : 一个goroutine的集合点,等待或发布一个event。

多个go程暂停在某个point上,等待一个事件信号再继续执行。没有cond的时候是怎么做的,当然是for循环,但是这有个大问题。

func 无cond() {
	isOK := false
	go func() {
		for isOK == false {
			// time.Sleep(time.Microsecond) // bad method
			// do something
		}
		fmt.Println(\"OK I finished\")
	}()
	go func() {
		for isOK == false {
			// time.Sleep(time.Microsecond) // bad method
			// do something
		}
		fmt.Println(\"OK I finished\")
	}()
	time.Sleep(time.Second * 5)
	isOK = true
	select {}
}

image-20220918193642390

这会消耗一整个CPU核心的所有周期,有些人会引入 time.Sleep 实际上这会让算法低效,这时候我们可以使用 cond。

func 有cond() {
	var wg sync.WaitGroup
	cond := sync.NewCond(&sync.Mutex{})
	test := func() {
		defer wg.Done()
		defer cond.L.Unlock()
		cond.L.Lock()
		cond.Wait()
		fmt.Println(\"something work...OK finished\")
	}
	wg.Add(2)
	go test()
	go test()
	time.Sleep(time.Second * 5)
	cond.Broadcast() // 通知所有go程
	// cond.Signal() // 通知等待时间最久的一个go程
	wg.Wait()
}

cond运行时内部维护一个FIFO列表。与利用channel相比,cond类型性能要高很多。

Once 只允许一次

可以配合单例模式使用,将判断对象是否为null改为sync.Once用于创建唯一对象。

sync.Once只计算调用Do方法的次数,而不是多少次唯一调用Do方法。所以在必要情况下声明多个sync.Once变量而不是用一个。下面的例子输出 1

func 只调用一次() {
	var once sync.Once
	count := 0
	once.Do(func() {
		count++
	})
	once.Do(func() {
		count--
	})
	fmt.Println(count)
}

Pool 池子

对象池模式是一种创建和提供可供使用的固定数量实例或Pool实例的方法。通常用于约束创建昂贵的场景,比如数据库连接,以便只创建固定数量的实例,但不确定数量的操作仍然可以请求访问这些场景。

使用pool的另一个原因是实例化的对象会被GC自动清理,而pool不会

  • 可以通过限制创建的对象数量来节省主机内存。
  • 提前加载获取引用到另一个对象所需的时间,比如建立服务器连接。

你的并发进程需要请求一个对象,但是在实例化之后很快地处理它们,或者在这些对象的构造可能会对内存产生负面影响,这时最好使用Pool设计模式。但是必须确保pool中对象是同质的,否则性能大打折扣。

注意事项

  • 实例化 sync.Pool ,调用 New 方法创建成员变量是线程安全的。
  • 收到来自Get的实例,不要对所接受的对象的状态做出任何假设。(同质,不需要做if判断)
  • 当你用完了一个从Pool取出的对象时,一定要调用put,否则无法复用这个实例。通常情况下用defer完成。
  • Pool内的分布必须大致均匀
type conn struct{}

func 对象池() {
	pool := &sync.Pool{New: func() any {
		time.Sleep(time.Millisecond * 250)
		fmt.Println(\"创建连接对象\")
		return &conn{}
	}}
	for i := 0; i < 10; i++ {
		pool.Put(pool.New())
	}
	fmt.Println(\"初始化结束\")
	c1 := pool.Get()
	c2 := pool.Get()
	pool.Put(c1)
	pool.Put(c2)
}

Channel 通道

channel也可以用来同步内存访问,但最好用于在goroutine之间传递消息(channel是将goroutine绑定在一起的粘合剂)。双向 chan 变量名后缀加 Stream

带缓存的channel和不带缓存的channel声明是一样的

var dataStream chan interface{}

双向channel可以隐式转换成单向channel,这对函数返回单向通道很有用

var receiveChan <-chan interface{}
var sendChan chan<- interface{}
dataStream := make(chan interface{})

receiveChan = dataStream
sendChan = datraStream

go语言中channel是阻塞的,意味着channel内的数据被消费后,新的数据才可以写入。通过 <- 操作符的接受形式可以选择返回两个值。

salutation,ok := <-dataStream

当channel未关闭时,ok返回true,关闭后返回false。即使channel关闭了,也能读取到默认值,为了支持一个channel有单个上游写入,有多个下游读取。


模拟之前WaitGroup的例子

func 竞争条件_通道() {
	var data int
	var Stream chan interface{} = make(chan interface{})
	go func() {
		data++
		Stream <- struct{}{}
	}()
	<-Stream
	if data == 0 {
		fmt.Println(\"Value\", data)
	} else {
		fmt.Println(\"Value 不是 0\")
	}
}

模拟之前cond同步多个go程的例子

func channel代替cond() {
	var wg sync.WaitGroup
	Stream := make(chan interface{})
	test := func() {
		defer wg.Done()
		<-Stream
		fmt.Println(\"something work...OK finished\")
	}
	wg.Add(1)
	go test()
	go test()
	time.Sleep(time.Second * 5)
	close(Stream)
	wg.Wait()
}

在同一时间打开或关闭多个goroutine可以考虑用channel。


channel操作结果

操作 Channel状态 结果
Read nil 阻塞
打开且非空 输出值
打开但空 阻塞
关闭的 默认值,false
只写 编译错误
Write nil 阻塞
打开但填满 阻塞
打开但不满 写入
关闭的 panic
只读 编译错误
close nil panic
打开且非空 关闭Channel;仍然能读取通道数据,直到读取完毕返回默认值
打开但空 关闭Channel;返回默认值
关闭的 panic
只读 编译错误

Channel 使用哲学

在正确的环境中配置Channel,分配channel的所有权这里的所有权被定义为 实例化、写入和关闭channel的goroutine。重要的是弄清楚哪个goroutine拥有channel。

单向channel声明的是一种工具,允许我们区分所有者和使用者。一旦我们将channel所有者和非channel所有者区分开来,前面的表的结果会非常清晰。可以开始讲责任分配给哪些拥有channel的goroutine和不拥有channel的goroutine。

拥有channel的goroutine

  • 实例化channel
  • 执行写操作,或将所有权传递个另一个goroutine
  • 关闭channel
  • 执行这三件事,并通过只读channel把它们暴露出来。

使用channel的goroutine

  • 知道channel是何时关闭的 => 检查第二个返回值
  • 正确处理阻塞 =>取决于你的算法

尽量保持channel的所有权很小,消费者函数只能执行channel的读取方法,因此只需要知道它应该如何处理阻塞和channel的关闭。

func 通道使用哲学() {
    // 所有权范围足够小,职责明确
	chanOwner := func() <-chan int {
		resultStream := make(chan int, 5)
		go func() { 
			defer close(resultStream)
			for i := 0; i < 5; i++ {
				resultStream <- i
			}
		}()
		return resultStream // 传递单向通道给另一个 goroutine
	}
	resultStream := chanOwner()
	for result := range resultStream {
		fmt.Println(result)
	}
	fmt.Println(\"Done\")
}

Select 选择语句

Go语言运行时将在一组case语句中执行伪随机选择。

var c<-chan int // 注意是 nil,永远阻塞
select{
	case <-c:
    case <- time.After(1 * time.Second):
    fmt.Println(\"Timed out.\")
}

time.After函数通过传入time.Duration参数返回一个数值并写入channel。select允许加default语句,通常配合for-select循环一起使用,允许go程在等待另一个go程结果的同时,自己干一些事情。

GOMAXPROCS

通过修改 runtime.GOMAXPROCS 允许你修改OS线程的数量。一般是为了调试,添加OS线程来更频繁触发竞争条件。

参考资料

  • 《Go语言并发之道》Katherine CoxBuday

  • 《Go语言核心编程》李文塔

  • 《Go语言高级编程》柴树彬、曹春辉


来源:https://www.cnblogs.com/linxiaoxu/p/16705872.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » 我的Go并发之旅、02 基本并发原语

相关推荐

  • 暂无文章