RxJava 处理来自 IB 的实时金融市场数据(第 2 部分)

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡/ 赠书活动

目前,正在 星球 内带小伙伴们做第一个项目:全栈前后端分离博客项目,采用技术栈 Spring Boot + Mybatis Plus + Vue 3.x + Vite 4手把手,前端 + 后端全栈开发,从 0 到 1 讲解每个功能点开发步骤,1v1 答疑,陪伴式直到项目上线,目前已更新了 204 小节,累计 32w+ 字,讲解图:1416 张,还在持续爆肝中,后续还会上新更多项目,目标是将 Java 领域典型的项目都整上,如秒杀系统、在线商城、IM 即时通讯、权限管理等等,已有 870+ 小伙伴加入,欢迎点击围观

在这篇文章中,我们将使我们的可观察对象从 AAPL 和 GOOG 接收实时数据,这不是什么新鲜事,但在 observeOn 运算符的帮助下,我们可以毫不费力地在其自己的线程上聚合每个工具的报价数据。多核编程变得简单:

  • 使用 IB API 订阅 APPL 和 GOOG 报价
  • 修改 marketDataObservable 以在其自己的线程上进行 1 分钟柱聚合。

1. 使用 IB API 订阅 APPL 和 GOOG Ticks


 InteractiveBrokersFeed.getInstance().connect();
 InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.APPL.val());
 InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.GOOG.val());

现在我们将在 GOOG 和 AAPL 上线时收到他们的消息。

2. 修改 marketDataObservable 以在其自己的线程上进行 1 分钟柱聚合。


 InteractiveBrokersFeed.getInstance().connect();
 InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.APPL.val());
 InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.GOOG.val());

2 .每次来自不同 Instrument 的数据到达时,我们使用 defer 创建一个新的可观察对象。

4 .和 5 。我们还返回 Observables 以遵守延迟签名(它需要返回一个 observable)。

6 .这就是所有魔法发生的地方 subscribeOn(Schedulers.computation()) 告诉 RxJava 在每次有人有效地订阅前一个 observable 时使用线程池计算,这使得所有代码来自 2 。到 5 。在新线程上运行。

3. 针对 IB 演示提要运行它

只需按照 github 上的说明进行操作


 InteractiveBrokersFeed.getInstance().connect();
 InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.APPL.val());
 InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.GOOG.val());

你应该看到类似这样的东西,注意线程的名称,你可以看到每次聚合一些数据时它都是不同的,这里是 RxComputationThreadPool-4 和 RxComputationThreadPool-5。


 InteractiveBrokersFeed.getInstance().connect();
 InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.APPL.val());
 InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.GOOG.val());

待续...(第 3 部分使用 RxJava 和 IB 编写简单的算法交易策略)