我阅读了golang中channel的实现

最近,为了迎接未来的CPU 10000核时代,我在C++中实现了一种名为”channel”的并发玩具,这在近年来非常流行。同时,我也参考了Golang中的channel实现,并对我自己的实现进行了解释。

「Channel是什么?

在功能上,它与被称为”blocking_queue”的thread基本相同。

具有以下特点的:
– 在goroutine上运行(而不是基于thread+mutex)。
– 使用有限的队列。
– 队列的元素数量允许为0。
– 具有关闭功能。

什么是goroutine?

我找不到对于goroutine这个词的明确定义,但我认为goroutine这个词大致上有以下两种意思。

    Go语言的轻量级线程可以很好地执行、暂停和恢复操作。在Go语言调度器中,类似于异步任务的”可执行任务”。

另外,goroutine在源代码中被简写为g。

请参考

请参照

请查阅

请借鉴

请参考以上

https://golang.org/doc/faq#goroutines

goroutineについて

https://morsmachine.dk/go-scheduler

golangのスケジューラについて

https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/runtime2.go#L404

goroutineの構造体について

阅读

我們將立即查看早速實施channel的內容。請至https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go。

“channel的结构体定义” (channel de jiegou ti dingyi)

阅读下去就会明白,大致上具有以下结构。

struct chan{
  std::deque<value_type> value_queue; // 本当はRingBufferだけどまぁ大体同じでしょ
  std::list<RecvSudog> recv_queue;
  std::list<SendSudog> send_queue;
  bool closed = false;
  std::mutex mutex;
};

channel主要由三个队列组成。
value_queue是用于存储值的队列。
send/recv_queue是用于保存发送/接收协程的队列。

创造出 chan

在这里进行了channel的生成
当队列元素或元素类型的大小为0时,进行了优化以减小value_queue的大小,但除此之外没有其他有趣的地方,可以跳过阅读

实现邮件发送处理。

送信处理的行为如下所示。

    1. 如果channel是空通道或者已关闭的情况下,会无操作或者中断

 

    1. 当recv_queue中存在等待接收的goroutine时,会向该goroutine发送元素

 

    1. 当value_queue有空闲位置时,会向value_queue中推送元素

 

    将自身的goroutine入队send_queue,并中断该goroutine

掌握了这点后再阅读源代码,会变得更容易理解。
我觉得已经理解了,所以马上开始阅读。

在这个链接上:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L158-L200

通过仅使用原子指令,而无需使用互斥锁(OS的锁),实现了处理和优化。

如果recv_queue中有元素:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L293
从recv_queue中取出一个goroutine,将值传递给该goroutine,解锁通道,并使取出的goroutine可以重新开始(≒在thread_pool中发布)

值得注意的是在解锁之后,实现了goroutine可以重新启动的功能。这是由于“在更改goroutine堆栈大小时,试图获取通道锁而导致死锁”这个问题引起的。请查看以下问题的详细信息:https://github.com/golang/go/issues/12967

如果value_queue有空位的话,请阅读每个人实现的解决方案。在这里提供的解决方案没有太多有趣的解释。

如果要将消息放入等待发送队列,可以查看以下链接:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L258

gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

这里通过gopark()函数 (https://github.com/golang/go/blob/fd6ba1c8a23d8a3fffb6c475b21f78510152ef5c/src/runtime/proc.go#L319) 来解释中断和解锁。

当调用gopark时,
1. 当前的协程会被暂停。
2. 在协程之外(请参考机器:协程是什么?的章节)执行锁的释放操作。

变成这种形势。

为什么要进行这样复杂的处理,即在机器的外部(oroutine)进行锁的释放操作?原因是,如果先释放通道的锁,那么在其他处理使用通道前,存在goroutine在中断之前被重新启动的可能性。

为了防止这种情况发生,我们只需在goroutine中添加互斥锁来确保它们不会同时执行。但是这样做会增加额外的互斥锁成本。
目前的实现似乎是为了避免这种情况发生。

实现信号处理和关闭功能

由于信件处理与以前大致相同,请各位自行阅读。

选择功能的实现

选择(不知为何)被分割为不同的文件。

选择行为如下:
1. 如果存在可以立即执行的操作(包括空通道和默认操作),则执行该操作。
2. 对于所有作为select目标的通道,将希望执行的操作入队到发送队列或接收队列,并暂停执行。
3. 当任意一个通道被执行时,协程将被恢复。
4. 取消未执行的入队操作。

记住这个,然后再读源代码会变得更容易。我想我已经记住了,所以我立即开始阅读。

以下是对于pollorder的决策以及每个channel的锁定获取过程的地址:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L121-L230

pollorder是一个决定搜索顺序的列表,“如果有可立即执行的操作,则执行该操作”。从实现中可以看出,这个顺序是随机的。这是模拟异步处理。

为了获得lock,我们按照channel结构体的内存地址顺序来获取lock。这是因为如果存在多个channel的lock需要获取的情况,如果不以相同的顺序获取lock,就可能导致死锁。

寻找可执行项:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L243

// pass 1 - look for something already waiting

根据评论中的建议,找到可行的操作并立即执行。

将所有channel入队:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L288-L327

// pass 2 - enqueue on all chans

将所有的频道添加到队列中并暂停。

实行和恢复:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L327
当任一信道恢复时,将执行pass3。
https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L819
作为执行时的处理,通过对dequeue时的g.selectDone执行CAS操作,防止多个信道执行select处理。

在选择操作之后的处理步骤:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L336

// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.

在pass3中,删除未使用的在pass2中入队的项目。

最后

Go语言中的通道(channel)在仔细观察时会进行相当复杂的处理。

广告
将在 10 秒后关闭
bannerAds