Ringbuffer:Hazelcast 中的新数据结构

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡/ 赠书活动

目前,正在 星球 内带小伙伴们做第一个项目:全栈前后端分离博客项目,采用技术栈 Spring Boot + Mybatis Plus + Vue 3.x + Vite 4手把手,前端 + 后端全栈开发,从 0 到 1 讲解每个功能点开发步骤,1v1 答疑,陪伴式直到项目上线,目前已更新了 204 小节,累计 32w+ 字,讲解图:1416 张,还在持续爆肝中,后续还会上新更多项目,目标是将 Java 领域典型的项目都整上,如秒杀系统、在线商城、IM 即时通讯、权限管理等等,已有 870+ 小伙伴加入,欢迎点击围观

Hazelcast Ringbuffer 是添加到 Hazelcast 3.5 的 新数据结构,在某些情况下可以作为队列更实用的替代方案。将 Ringbuffer 视为具有固定容量的圆形数组。与数组一样,Ringbuffer 中的每个项目都使用序列 ID(长整型)唯一标识。

Ringbuffer 是一个仅附加的数据结构;所以不可能删除一个项目。尾部是附加项目的地方,头部是环形缓冲区中找到最旧项目的地方。创建 Ringbuffer 和添加项目非常简单:


 Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");

很酷的是返回的序列也可以用来读出项目:


 Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");

由于每个项目都由其 sequence-id 唯一标识,因此返回的 sequence-id 是独一无二的,如果您使用 Ringbuffer,则可以将其用作廉价的 id 生成器。

环形缓冲区与队列的比较

与队列相比,Ringbuffer 的好处在于,对于队列,take 是一种破坏性操作;所以只有一个线程能够从队列中取出一个特定的项目。一旦被拿走,它就消失了。这可能有问题,原因有二:

  1. 如果系统在物品已被拿走但尚未完全处理之前崩溃,会发生什么情况?
  2. 如果您希望多个读者阅读同一个项目会怎样?一种方法是为每个读者创建一个队列并在每个队列上执行一次放置。问题在于它使 put 变得非常昂贵,因为对于 N 个读者,您需要执行 N 个 put。

因为对 Ringbuffer 的读取不是破坏性操作,并且读取器控制它要读取哪些项目,读取器很容易通过存储 sequence-id 来实现交付保证。

  • 至少一次 :在项目完全处理后存储序列 ID。如果系统在项目被完全处理之前崩溃,相同的项目将被再次读取,因为存储的 sequence-id 仍然包含旧值。
  • At Most Once :在开始处理项目之前存储 sequence-id。如果系统在项目完全处理之前崩溃,我们可能无法处理的项目的序列 ID 将被加载,系统可以从下一个项目继续。

读取操作不是破坏性操作的另一大优势是它非常快,因为它不需要复制——与队列不同。

容量

每个 Ringbuffer 都被创建为具有一定的容量——默认为 10k 项。 Ringbuffer 的增长不能超过这个容量,因此,最旧的项目最终会被覆盖(更多内容见下文)。可以使用 XML 或使用我们的编程 API 来配置 Ringbuffer。如果我们要设置容量:


 Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");

生存时间

默认情况下,Ringbuffer 中的项目会保留在 Ringbuffer 中,直到它们被覆盖。请注意,它们永远不会过期。这与使用常规数组的行为完全相同;一旦一个项目被写入数组,它就永远不会被自动删除。

实际上,您通常希望控制项目保持可用的时间(例如 30 秒)。对于 Ringbuffer,这可以通过在 RingbufferConfig 上设置生存时间来完成:


 Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");

如果生存时间为 30 秒,则消费者有 30 秒的时间窗口来处理该项目。如果一个项目被写入并且已经过了 31 秒,则读取完成并且该项目将不再可用。

生存时间有助于防止过度使用内存并防止数据过时;但它的真正价值在于它与 OverflowPolicy 结合使用时。 OverflowPolicy 确定当 Ringbuffer 已满并且没有要过期的项目时要做什么。目前有两种选择:

  • OVERWRITE: Ringbuffer 中最旧的项目被覆盖,即使它还没有老到过期。在这种情况下,您会偏爱生产者而不是消费者,因为如果消费者想要读取的数据不再存在,它可能会遇到 StaleSequenceException
  • 失败: 没有任何内容被覆盖,调用者收到写入失败的信号。然后由调用者决定做什么。

以下代码显示了如何结合 OverflowPolicy.FAIL 设置指数退避:


 Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");

配料

到目前为止显示的代码示例一次插入和读取一个项目。这种方式的问题在于操作调度、网络通信等会产生大量的开销,批量读写来分摊开销效率会高很多。

添加一批项目非常简单:


 Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");

除了提供批处理功能外,您还可以决定是通过调用 get 进行同步调用,还是通过使用 andThen 方法并提供回调使其成为异步调用。

读取一批项目有点复杂:


 Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");

在此示例中,我们要读取至少 1 个项目,最多 100 个项目。如果有 1000 个项目可用,这会非常有效,因为只需要执行 10 个操作。

您可能会在最后徘徊 null 参数。这是可以提供过滤器的地方。想象一下,有一个带有员工对象的 Ringbuffer,而您只想检索工程师;您可以提供一个选择工程师的过滤器。


 Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");

过滤器的好处在于它是在源头完成的,因此,不相关的项目不会发送给调用者。

可以使用过滤器完成的事情之一是并行化工作负载(例如,一位读者使用工程师过滤器与所有工程师打交道,而一位读者使用销售过滤器与所有销售人员打交道)。

查看 Ringbuffer 文档 »
准备好自己试试了吗?下载 Hazelcast 并立即开始!