|
|
@@ -0,0 +1,73 @@
|
|
|
+# 14.7 新旧模型对比:任务和worker
|
|
|
+
|
|
|
+设想我们需要处理很多任务;一个worker处理一项任务。任务可以被定义为一个结构体(具体的细节在这里并不重要):
|
|
|
+
|
|
|
+```go
|
|
|
+type Task struct {
|
|
|
+ // some state
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+旧模式:使用共享内存进行同步
|
|
|
+
|
|
|
+由各个任务组成的任务池共享内存;为了同步各个worker以及避免资源竞争,我们需要对任务池进行加锁保护:
|
|
|
+
|
|
|
+```go
|
|
|
+ type Pool struct {
|
|
|
+ Mu sync.Mutex
|
|
|
+ Tasks []Task
|
|
|
+ }
|
|
|
+```
|
|
|
+sync.Mutex(参见9.3)是互斥锁:它用来在代码中保护临界区资源:同一时间只有一个协程(goroutine)可以进入该临界区。如果出现了同一时间多个协程都进入了该临界区,则会产生竞争:Pool结构就不能保证被正确更新。在传统的模式(在经典面向对象的语言中应用得比较多,比如C++,JAVA,C#)中,工作线程代码可能这样写:
|
|
|
+
|
|
|
+```go
|
|
|
+func Worker(pool *Pool) {
|
|
|
+ for {
|
|
|
+ pool.Mu.lock()
|
|
|
+ // begin critical section:
|
|
|
+ task := pool.Task[0] // take the first task
|
|
|
+ pool.Tasks = pool.Task[1:] // update the pool of tasks
|
|
|
+ // end critical section
|
|
|
+ pool.Mu.Unlock()
|
|
|
+ process(task)
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+这些worker有许多都可以并发执行;他们当然可以在协程中启动。一个worker先将pool锁定,从pool获取第一项任务,再解锁和处理任务。加锁保证了同一时间只有个协程可以进入到pool中:一项任务有且只能被赋予一个work。如果不加锁,则工作协程可能会在`task:=pool.Task[0]`发生中断,导致`pool.Tasks=pool.Task[1:]`产出异常:一些工作协程获取不到任务,而一些任务可能被多个工作协程得到。加锁实现同步的方式在工作协程比较少时可以工作的很好,但是当工作协程池数量很大,任务量也很多时,处理效率将会因为频繁的加锁/解锁开销而降低。当工作协程数增加到一个阈值时,程序效率会急剧下降,这就成为了瓶颈。
|
|
|
+
|
|
|
+新模式:使用通道
|
|
|
+
|
|
|
+使用通道进行同步:使用一个通道接受需要处理的任务,一个通道接受处理完成的任务(及其结果)。工作处理者在协程中启动,其数量N应该根据任务数量进行调整。
|
|
|
+
|
|
|
+主线程扮演着Master节点角色,可能写成如下形式:
|
|
|
+
|
|
|
+```go
|
|
|
+ func main() {
|
|
|
+ pending, done := make(chan *Task), make(chan *Task)
|
|
|
+ go sendWork(pending) // put tasks with work on the channel
|
|
|
+ for i := 0; i < N; i++ { // start N goroutines to do work
|
|
|
+ go Worker(pending, done)
|
|
|
+ }
|
|
|
+ consumeWork(done) // continue with the processed tasks
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+worker的逻辑比较简单:从pending通道拿任务,处理后将其放到done通道中:
|
|
|
+
|
|
|
+```go
|
|
|
+ func Worker(in, out chan *Task) {
|
|
|
+ for {
|
|
|
+ t := <-in
|
|
|
+ process(t)
|
|
|
+ out <- t
|
|
|
+ }
|
|
|
+ }
|
|
|
+```
|
|
|
+todo
|
|
|
+
|
|
|
+## 链接
|
|
|
+
|
|
|
+[目录](directory.md)
|
|
|
+上一节:[协程和恢复(recover)](14.6.md)
|
|
|
+下一节:[惰性生成器实现](14.8.md)
|