RxJava 处理金融市场数据:第 1 部分

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡/ 赠书活动

目前,正在 星球 内带小伙伴们做第一个项目:全栈前后端分离博客项目,采用技术栈 Spring Boot + Mybatis Plus + Vue 3.x + Vite 4手把手,前端 + 后端全栈开发,从 0 到 1 讲解每个功能点开发步骤,1v1 答疑,陪伴式直到项目上线,目前已更新了 204 小节,累计 32w+ 字,讲解图:1416 张,还在持续爆肝中,后续还会上新更多项目,目标是将 Java 领域典型的项目都整上,如秒杀系统、在线商城、IM 即时通讯、权限管理等等,已有 870+ 小伙伴加入,欢迎点击围观

在我最近的一个项目中,我使用 Iteractive Brokers Java API 自动化了一个交易策略, RxJava 是处理大量实时和历史数据的完美配套框架。阅读 RxJava 的文档和示例可能会令人生畏且非常抽象,这里是一个如何使用它以及它可以做什么的实际示例:

  • 将市场数据提供给 RxJava marketDataObservable 类
  • 使用 groupBy、平面图和缓冲区运算符将报价数据聚合到 1 分钟柱

1. 将市场数据提供给 RxJava marketDataObservable 类


 public void subscribeRealTimeData(Instrument instrument) {
    controller.reqTopMktData(instrument.ibContract, "232", false, new ApiController.ITopMktDataHandler() 
    {
        @Override
        public void tickPrice(TickType tickType, double price, int canAutoExecute) {
        if (tickType == TickType.ASK) 
        {
            log.info("IB tick " + new Date() + " price " + price);
            LivePriceEvent priceEvent = new LivePriceEvent(System.currentTimeMillis(), instrument, new BigDecimal(price).setScale(3, RoundingMode.UP));
            marketDataObservable.push(priceEvent);
        }

    }

现在每次报价从 IB 到达时,它都会被推送到我们的 Obseravble。现在我们可以根据需要使用 RxJava Observable 的不同运算符折叠数据

2. 将报价数据汇总到 1 分钟柱


 public void subscribeRealTimeData(Instrument instrument) {
    controller.reqTopMktData(instrument.ibContract, "232", false, new ApiController.ITopMktDataHandler() 
    {
        @Override
        public void tickPrice(TickType tickType, double price, int canAutoExecute) {
        if (tickType == TickType.ASK) 
        {
            log.info("IB tick " + new Date() + " price " + price);
            LivePriceEvent priceEvent = new LivePriceEvent(System.currentTimeMillis(), instrument, new BigDecimal(price).setScale(3, RoundingMode.UP));
            marketDataObservable.push(priceEvent);
        }

    }

3. 针对 IB 演示提要运行分钟柱聚合器

只需按照 github 上的说明进行操作


 public void subscribeRealTimeData(Instrument instrument) {
    controller.reqTopMktData(instrument.ibContract, "232", false, new ApiController.ITopMktDataHandler() 
    {
        @Override
        public void tickPrice(TickType tickType, double price, int canAutoExecute) {
        if (tickType == TickType.ASK) 
        {
            log.info("IB tick " + new Date() + " price " + price);
            LivePriceEvent priceEvent = new LivePriceEvent(System.currentTimeMillis(), instrument, new BigDecimal(price).setScale(3, RoundingMode.UP));
            marketDataObservable.push(priceEvent);
        }

    }

你应该看到这样的东西


 public void subscribeRealTimeData(Instrument instrument) {
    controller.reqTopMktData(instrument.ibContract, "232", false, new ApiController.ITopMktDataHandler() 
    {
        @Override
        public void tickPrice(TickType tickType, double price, int canAutoExecute) {
        if (tickType == TickType.ASK) 
        {
            log.info("IB tick " + new Date() + " price " + price);
            LivePriceEvent priceEvent = new LivePriceEvent(System.currentTimeMillis(), instrument, new BigDecimal(price).setScale(3, RoundingMode.UP));
            marketDataObservable.push(priceEvent);
        }

    }

待续(第 2 部分通过 observeOn RxJava 运算符利用您的多核 cpu)