我阅读了golang中channel的实现
最近,为了迎接未来的CPU 10000核时代,我在C++中实现了一种名为”channel”的并发玩具,这在近年来非常流行。同时,我也参考了Golang中的channel实现,并对我自己的实现进行了解释。
「Channel是什么?
在功能上,它与被称为”blocking_queue”的thread基本相同。
具有以下特点的:
– 在goroutine上运行(而不是基于thread+mutex)。
– 使用有限的队列。
– 队列的元素数量允许为0。
– 具有关闭功能。
什么是goroutine?
我找不到对于goroutine这个词的明确定义,但我认为goroutine这个词大致上有以下两种意思。
- Go语言的轻量级线程可以很好地执行、暂停和恢复操作。在Go语言调度器中,类似于异步任务的”可执行任务”。
另外,goroutine在源代码中被简写为g。
请参考
请参照
请查阅
请借鉴
请参考以上
https://golang.org/doc/faq#goroutines
goroutineについて
https://morsmachine.dk/go-scheduler
golangのスケジューラについて
https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/runtime2.go#L404
goroutineの構造体について
阅读
我們將立即查看早速實施channel的內容。請至https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go。
“channel的结构体定义” (channel de jiegou ti dingyi)
阅读下去就会明白,大致上具有以下结构。
struct chan{
std::deque<value_type> value_queue; // 本当はRingBufferだけどまぁ大体同じでしょ
std::list<RecvSudog> recv_queue;
std::list<SendSudog> send_queue;
bool closed = false;
std::mutex mutex;
};
channel主要由三个队列组成。
value_queue是用于存储值的队列。
send/recv_queue是用于保存发送/接收协程的队列。
创造出 chan
在这里进行了channel的生成
当队列元素或元素类型的大小为0时,进行了优化以减小value_queue的大小,但除此之外没有其他有趣的地方,可以跳过阅读
实现邮件发送处理。
送信处理的行为如下所示。
-
- 如果channel是空通道或者已关闭的情况下,会无操作或者中断
-
- 当recv_queue中存在等待接收的goroutine时,会向该goroutine发送元素
-
- 当value_queue有空闲位置时,会向value_queue中推送元素
- 将自身的goroutine入队send_queue,并中断该goroutine
掌握了这点后再阅读源代码,会变得更容易理解。
我觉得已经理解了,所以马上开始阅读。
在这个链接上:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L158-L200
通过仅使用原子指令,而无需使用互斥锁(OS的锁),实现了处理和优化。
如果recv_queue中有元素:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L293
从recv_queue中取出一个goroutine,将值传递给该goroutine,解锁通道,并使取出的goroutine可以重新开始(≒在thread_pool中发布)
值得注意的是在解锁之后,实现了goroutine可以重新启动的功能。这是由于“在更改goroutine堆栈大小时,试图获取通道锁而导致死锁”这个问题引起的。请查看以下问题的详细信息:https://github.com/golang/go/issues/12967
如果value_queue有空位的话,请阅读每个人实现的解决方案。在这里提供的解决方案没有太多有趣的解释。
如果要将消息放入等待发送队列,可以查看以下链接:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L258
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
这里通过gopark()函数 (https://github.com/golang/go/blob/fd6ba1c8a23d8a3fffb6c475b21f78510152ef5c/src/runtime/proc.go#L319) 来解释中断和解锁。
当调用gopark时,
1. 当前的协程会被暂停。
2. 在协程之外(请参考机器:协程是什么?的章节)执行锁的释放操作。
变成这种形势。
为什么要进行这样复杂的处理,即在机器的外部(oroutine)进行锁的释放操作?原因是,如果先释放通道的锁,那么在其他处理使用通道前,存在goroutine在中断之前被重新启动的可能性。
为了防止这种情况发生,我们只需在goroutine中添加互斥锁来确保它们不会同时执行。但是这样做会增加额外的互斥锁成本。
目前的实现似乎是为了避免这种情况发生。
实现信号处理和关闭功能
由于信件处理与以前大致相同,请各位自行阅读。
选择功能的实现
选择(不知为何)被分割为不同的文件。
选择行为如下:
1. 如果存在可以立即执行的操作(包括空通道和默认操作),则执行该操作。
2. 对于所有作为select目标的通道,将希望执行的操作入队到发送队列或接收队列,并暂停执行。
3. 当任意一个通道被执行时,协程将被恢复。
4. 取消未执行的入队操作。
记住这个,然后再读源代码会变得更容易。我想我已经记住了,所以我立即开始阅读。
以下是对于pollorder的决策以及每个channel的锁定获取过程的地址:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L121-L230
pollorder是一个决定搜索顺序的列表,“如果有可立即执行的操作,则执行该操作”。从实现中可以看出,这个顺序是随机的。这是模拟异步处理。
为了获得lock,我们按照channel结构体的内存地址顺序来获取lock。这是因为如果存在多个channel的lock需要获取的情况,如果不以相同的顺序获取lock,就可能导致死锁。
寻找可执行项:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L243
// pass 1 - look for something already waiting
根据评论中的建议,找到可行的操作并立即执行。
将所有channel入队:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L288-L327
// pass 2 - enqueue on all chans
将所有的频道添加到队列中并暂停。
实行和恢复:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L327
当任一信道恢复时,将执行pass3。
https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L819
作为执行时的处理,通过对dequeue时的g.selectDone执行CAS操作,防止多个信道执行select处理。
在选择操作之后的处理步骤:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L336
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
在pass3中,删除未使用的在pass2中入队的项目。
最后
Go语言中的通道(channel)在仔细观察时会进行相当复杂的处理。