#key #channel #single-consumer #active #multi-producer #message-queue #recv

key-message-channel

Multi-producer single-consumer queue capable of queuing messages by message key

1 unstable release

0.1.0 Oct 25, 2022

#774 in Concurrency

MIT license

36KB
632 lines

思路

本题目要求实现一个带 Key 的 mpsc channel。因为只有当队列中的消息不与还处于 active 状态的消息相冲突才能够出队,所以:

  • 需要有一个结构来保存目前处于 active 状态的所有消息的 Key;
  • 当一个消息想要出队时,首先与该结构进行比较,没有交集的情况下才可以出队,否则继续等待;
  • 当处于 active 状态的消息被 Drop 时,能够将该结构中该消息的所有 Key 删除掉。

最简单的办法是用一个HashSet来保存所有的 active keys,当 receiver 调用recv方法时,依次检查队列中的每个消息的 Key 是否与 active keys 有交集,如果没有,将该消息出队;如果所有消息都有交集,则 receiver 进入阻塞状态,直到有 active 的消息被 Drop 了,或者是 sender 送来了新的消息时,再次重复如上步骤。

这种方法的缺点在于,当任意一个 active 的消息被 Drop 后,队列中所有的消息都会被检验一遍,很费时间。

因此想到,可以借鉴等待队列的思想,用一个单独的结构保存有哪些消息在等待哪一个 active key,当某一个 active 的消息被 Drop 后,将会通知它所对应的若干个等待队列,在通知的过程中,如果发现队列中的某一个消息不需要再等待任何 key 了,则说明这个消息不再与当前任何一个 active 的消息冲突,于是可以将其送到队列中,等待出队。

尽管只有当不与任何 active 的消息冲突时,一个消息才会入队,但这不代表这个消息就一定能够出队,因为队列具有先后顺序,排在该消息之前的消息将会早于该消息出队,一旦出队之后,这些消息就会变成 active 的,从而可能与当前的消息发生冲突。所以在出队之前,需要再次检查消息是否与当前的 active keys 有交集,如果有,则重新将该消息放回到等待队列中继续等待;如果没有,将这个消息的所有 key 添加到 active keys 中,成功出队。

有了这样一个“出队检查”机制后,对于 sender 新送过来的消息,可以让这些消息直接入队,当这些消息即将出队时再检查它们是否有冲突,并放置到相应的等待队列中。

数据结构

Message

消息结构体,持有多个 Key。

每个消息结构体还包含着两个引用:一个为当前 channel 的active_set引用,一个Condvar引用。当消息被 Drop 时,将会:

  1. active_set中把自己的所有 key 删除掉;
  2. 通知条件变量,唤醒 receiver。

MessageQueue

消息队列。这个队列是对无锁 mpsc 队列的简单封装,增强了enqueue的功能:当有消息入队时,该消息队列将会通知条件变量,唤醒 receiver。

在无锁队列的选择上,选择了一种名为jiffy的队列实现,这种队列性能较高,且能保证线性一致性。

Activator

该结构体负责管理上述的 active keys 以及等待队列,从而让消息在合适的时候进入到消息队列中。

active_set

一个保存着所有 active keys 的 HashSet

为了降低锁的粒度,在该结构中包含:

  • 一个Mutex<HashSet>,用于从多个来源修改 active keys;
  • 一个AtomicBool,名称为modified标识 active keys 是否被修改,且没有与wait_list进行同步

通过将锁的粒度缩小,receiver 能够直接读取modified来得知当前的active_set是否需要进行同步,从而避免不必要的同步。

这里的“多个来源”,指:

  1. 当处于 active 状态的消息被 Drop 时,需要在active_set中删除该消息的所有 Key;
  2. receiver成功recv一个消息后,需要将新的消息的 Key 添加到active_set中。

这两个更改来源来自于不同的结构,因此需要将HashSet上锁。

wait_list

一个保存着若干 key,以及等待着这个 key 的若干消息的引用的HashMap。同时该结构体中还保存着对消息队列的引用。

这个HashMap应该与active_set是同步的,同步的含义为:

  • 当某个 key 被移出了active_set,则wait_list也会相对应的删除这个 key,并通知所有等待在这个 key 上的消息;
  • 当某个 key 被新加入了active_set,则wait_list也会增加相应的空队列。

由上面可知,当某个 key 不再 active 时,wait_list需要有某种方式来通知等待在该 key 上的消息:该 key 已经被释放,不需要再等待这个 key 了。当某个消息没有任何需要等待的 key 时,就可以知道该消息现在不与任何 active 的消息相冲突,因此可以将该事件添加到消息队列中,等待出队

在实际实现上,这个“某种方式”非常简单:等待队列中保存的是Arc<Message<T>>,因此当队列被删除时,指针的引用计数就会相应减 1,达成了一种“通知”的效果。当某个消息的引用计数为 1:Arc::strong_count(&msg) == 1时,该消息现在不与任何 active 的消息相冲突,于是可以入队。

wait_list的改变只有一个来源,即 receiver:当 receiver 被唤醒时,执行wait_listactive_set的同步,从而改变wait_list中的内容。

Receiver

channel 的接收端。

接收端有recv函数,用来从队列中接收消息并返回。

recv的执行过程为:

  1. 首先执行synchronize,将不与 active 的消息相冲突的消息放入到队列中;
  2. 从队列中取消息,检查消息的 key 是否与当前的active_set有交集;
  3. 如果有交集,说明队列前面有其他消息先于该消息出队了,因此需要将该消息重新添加回等待队列进行等待,然后继续取下一个消息,重复上述过程;
  4. 如果没有交集,则将该消息的 key 添加到active_set中,然后将该消息出队;
  5. 如果没有可以返回的消息,则挂起,等待条件变量通知,重复上述过程。

recv挂起的原因可能有:

  • 当前队列为空;
  • 当前队列不为空,但队列中的所有消息都与 active 的消息相冲突;

而唤醒recv的原因可能有:

  • Sender在队列中添加了新的消息;
  • active 的消息被 Drop 了,导致active_set更新。

Sender

channel 的发送端。

发送端有send函数,将会在消息队列中添加新的消息。由于消息队列是无锁、无限容量的队列,所以多个 sender(在不上锁的前提下)同时向消息队列中添加消息不会导致并发冲突。

Dependencies

~0.4–29MB
~369K SLoC