Hazelcast Ringbuffer 是添加到 Hazelcast 3.5 的 新数据结构,在某些情况下可以作为队列更实用的替代方案。将 Ringbuffer 视为具有固定容量的圆形数组。与数组一样,Ringbuffer 中的每个项目都使用序列 ID(长整型)唯一标识。
Ringbuffer 是一个仅附加的数据结构;所以不可能删除一个项目。尾部是附加项目的地方,头部是环形缓冲区中找到最旧项目的地方。创建 Ringbuffer 和添加项目非常简单:
Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");
很酷的是返回的序列也可以用来读出项目:
Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");
由于每个项目都由其 sequence-id 唯一标识,因此返回的 sequence-id 是独一无二的,如果您使用 Ringbuffer,则可以将其用作廉价的 id 生成器。
环形缓冲区与队列的比较
与队列相比,Ringbuffer 的好处在于,对于队列,take 是一种破坏性操作;所以只有一个线程能够从队列中取出一个特定的项目。一旦被拿走,它就消失了。这可能有问题,原因有二:
- 如果系统在物品已被拿走但尚未完全处理之前崩溃,会发生什么情况?
- 如果您希望多个读者阅读同一个项目会怎样?一种方法是为每个读者创建一个队列并在每个队列上执行一次放置。问题在于它使 put 变得非常昂贵,因为对于 N 个读者,您需要执行 N 个 put。
因为对 Ringbuffer 的读取不是破坏性操作,并且读取器控制它要读取哪些项目,读取器很容易通过存储 sequence-id 来实现交付保证。
- 至少一次 :在项目完全处理后存储序列 ID。如果系统在项目被完全处理之前崩溃,相同的项目将被再次读取,因为存储的 sequence-id 仍然包含旧值。
- At Most Once :在开始处理项目之前存储 sequence-id。如果系统在项目完全处理之前崩溃,我们可能无法处理的项目的序列 ID 将被加载,系统可以从下一个项目继续。
读取操作不是破坏性操作的另一大优势是它非常快,因为它不需要复制——与队列不同。
容量
每个 Ringbuffer 都被创建为具有一定的容量——默认为 10k 项。 Ringbuffer 的增长不能超过这个容量,因此,最旧的项目最终会被覆盖(更多内容见下文)。可以使用 XML 或使用我们的编程 API 来配置 Ringbuffer。如果我们要设置容量:
Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");
生存时间
默认情况下,Ringbuffer 中的项目会保留在 Ringbuffer 中,直到它们被覆盖。请注意,它们永远不会过期。这与使用常规数组的行为完全相同;一旦一个项目被写入数组,它就永远不会被自动删除。
实际上,您通常希望控制项目保持可用的时间(例如 30 秒)。对于 Ringbuffer,这可以通过在 RingbufferConfig 上设置生存时间来完成:
Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");
如果生存时间为 30 秒,则消费者有 30 秒的时间窗口来处理该项目。如果一个项目被写入并且已经过了 31 秒,则读取完成并且该项目将不再可用。
生存时间有助于防止过度使用内存并防止数据过时;但它的真正价值在于它与 OverflowPolicy 结合使用时。 OverflowPolicy 确定当 Ringbuffer 已满并且没有要过期的项目时要做什么。目前有两种选择:
-
OVERWRITE:
Ringbuffer 中最旧的项目被覆盖,即使它还没有老到过期。在这种情况下,您会偏爱生产者而不是消费者,因为如果消费者想要读取的数据不再存在,它可能会遇到
StaleSequenceException
。 - 失败: 没有任何内容被覆盖,调用者收到写入失败的信号。然后由调用者决定做什么。
以下代码显示了如何结合
OverflowPolicy.FAIL
设置指数退避:
Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");
配料
到目前为止显示的代码示例一次插入和读取一个项目。这种方式的问题在于操作调度、网络通信等会产生大量的开销,批量读写来分摊开销效率会高很多。
添加一批项目非常简单:
Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");
除了提供批处理功能外,您还可以决定是通过调用 get 进行同步调用,还是通过使用
andThen
方法并提供回调使其成为异步调用。
读取一批项目有点复杂:
Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");
在此示例中,我们要读取至少 1 个项目,最多 100 个项目。如果有 1000 个项目可用,这会非常有效,因为只需要执行 10 个操作。
您可能会在最后徘徊
null
参数。这是可以提供过滤器的地方。想象一下,有一个带有员工对象的 Ringbuffer,而您只想检索工程师;您可以提供一个选择工程师的过滤器。
Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");
过滤器的好处在于它是在源头完成的,因此,不相关的项目不会发送给调用者。
可以使用过滤器完成的事情之一是并行化工作负载(例如,一位读者使用工程师过滤器与所有工程师打交道,而一位读者使用销售过滤器与所有销售人员打交道)。