1 unstable release
0.1.0 | Oct 25, 2022 |
---|
#8 in #multi-producer
36KB
632 lines
思路
本题目要求实现一个带 Key 的 mpsc channel。因为只有当队列中的消息不与还处于 active 状态的消息相冲突才能够出队,所以:
- 需要有一个结构来保存目前处于 active 状态的所有消息的 Key;
- 当一个消息想要出队时,首先与该结构进行比较,没有交集的情况下才可以出队,否则继续等待;
- 当处于 active 状态的消息被 Drop 时,能够将该结构中该消息的所有 Key 删除掉。
最简单的办法是用一个HashSet
来保存所有的 active keys,当 receiver 调用recv
方法时,依次检查队列中的每个消息的 Key 是否与 active keys 有交集,如果没有,将该消息出队;如果所有消息都有交集,则 receiver 进入阻塞状态,直到有 active 的消息被 Drop 了,或者是 sender 送来了新的消息时,再次重复如上步骤。
这种方法的缺点在于,当任意一个 active 的消息被 Drop 后,队列中所有的消息都会被检验一遍,很费时间。
因此想到,可以借鉴等待队列的思想,用一个单独的结构保存有哪些消息在等待哪一个 active key,当某一个 active 的消息被 Drop 后,将会通知它所对应的若干个等待队列,在通知的过程中,如果发现队列中的某一个消息不需要再等待任何 key 了,则说明这个消息不再与当前任何一个 active 的消息冲突,于是可以将其送到队列中,等待出队。
尽管只有当不与任何 active 的消息冲突时,一个消息才会入队,但这不代表这个消息就一定能够出队,因为队列具有先后顺序,排在该消息之前的消息将会早于该消息出队,一旦出队之后,这些消息就会变成 active 的,从而可能与当前的消息发生冲突。所以在出队之前,需要再次检查消息是否与当前的 active keys 有交集,如果有,则重新将该消息放回到等待队列中继续等待;如果没有,将这个消息的所有 key 添加到 active keys 中,成功出队。
有了这样一个“出队检查”机制后,对于 sender 新送过来的消息,可以让这些消息直接入队,当这些消息即将出队时再检查它们是否有冲突,并放置到相应的等待队列中。
数据结构
Message
消息结构体,持有多个 Key。
每个消息结构体还包含着两个引用:一个为当前 channel 的active_set
引用,一个Condvar
引用。当消息被 Drop 时,将会:
- 从
active_set
中把自己的所有 key 删除掉; - 通知条件变量,唤醒 receiver。
MessageQueue
消息队列。这个队列是对无锁 mpsc 队列的简单封装,增强了enqueue
的功能:当有消息入队时,该消息队列将会通知条件变量,唤醒 receiver。
在无锁队列的选择上,选择了一种名为jiffy
的队列实现,这种队列性能较高,且能保证线性一致性。
Activator
该结构体负责管理上述的 active keys 以及等待队列,从而让消息在合适的时候进入到消息队列中。
active_set
一个保存着所有 active keys 的 HashSet
。
为了降低锁的粒度,在该结构中包含:
- 一个
Mutex<HashSet>
,用于从多个来源修改 active keys; - 一个
AtomicBool
,名称为modified
,标识 active keys 是否被修改,且没有与wait_list
进行同步。
通过将锁的粒度缩小,receiver 能够直接读取modified
来得知当前的active_set
是否需要进行同步,从而避免不必要的同步。
这里的“多个来源”,指:
- 当处于 active 状态的消息被 Drop 时,需要在
active_set
中删除该消息的所有 Key; - 当
receiver
成功recv
一个消息后,需要将新的消息的 Key 添加到active_set
中。
这两个更改来源来自于不同的结构,因此需要将HashSet
上锁。
wait_list
一个保存着若干 key,以及等待着这个 key 的若干消息的引用的HashMap
。同时该结构体中还保存着对消息队列的引用。
这个HashMap
应该与active_set
是同步的,同步的含义为:
- 当某个 key 被移出了
active_set
,则wait_list
也会相对应的删除这个 key,并通知所有等待在这个 key 上的消息; - 当某个 key 被新加入了
active_set
,则wait_list
也会增加相应的空队列。
由上面可知,当某个 key 不再 active 时,wait_list
需要有某种方式来通知等待在该 key 上的消息:该 key 已经被释放,不需要再等待这个 key 了。当某个消息没有任何需要等待的 key 时,就可以知道该消息现在不与任何 active 的消息相冲突,因此可以将该事件添加到消息队列中,等待出队。
在实际实现上,这个“某种方式”非常简单:等待队列中保存的是Arc<Message<T>>
,因此当队列被删除时,指针的引用计数就会相应减 1,达成了一种“通知”的效果。当某个消息的引用计数为 1:Arc::strong_count(&msg) == 1
时,该消息现在不与任何 active 的消息相冲突,于是可以入队。
wait_list
的改变只有一个来源,即 receiver:当 receiver 被唤醒时,执行wait_list
与active_set
的同步,从而改变wait_list
中的内容。
Receiver
channel 的接收端。
接收端有recv
函数,用来从队列中接收消息并返回。
recv
的执行过程为:
- 首先执行
synchronize
,将不与 active 的消息相冲突的消息放入到队列中; - 从队列中取消息,检查消息的 key 是否与当前的
active_set
有交集; - 如果有交集,说明队列前面有其他消息先于该消息出队了,因此需要将该消息重新添加回等待队列进行等待,然后继续取下一个消息,重复上述过程;
- 如果没有交集,则将该消息的 key 添加到
active_set
中,然后将该消息出队; - 如果没有可以返回的消息,则挂起,等待条件变量通知,重复上述过程。
recv
挂起的原因可能有:
- 当前队列为空;
- 当前队列不为空,但队列中的所有消息都与 active 的消息相冲突;
而唤醒recv
的原因可能有:
Sender
在队列中添加了新的消息;- active 的消息被 Drop 了,导致
active_set
更新。
Sender
channel 的发送端。
发送端有send
函数,将会在消息队列中添加新的消息。由于消息队列是无锁、无限容量的队列,所以多个 sender(在不上锁的前提下)同时向消息队列中添加消息不会导致并发冲突。
Dependencies
~0.4–27MB
~340K SLoC