在这篇文章中,我们将使我们的可观察对象从 AAPL 和 GOOG 接收实时数据,这不是什么新鲜事,但在 observeOn 运算符的帮助下,我们可以毫不费力地在其自己的线程上聚合每个工具的报价数据。多核编程变得简单:
- 使用 IB API 订阅 APPL 和 GOOG 报价
- 修改 marketDataObservable 以在其自己的线程上进行 1 分钟柱聚合。
1. 使用 IB API 订阅 APPL 和 GOOG Ticks
InteractiveBrokersFeed.getInstance().connect();
InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.APPL.val());
InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.GOOG.val());
现在我们将在 GOOG 和 AAPL 上线时收到他们的消息。
2. 修改 marketDataObservable 以在其自己的线程上进行 1 分钟柱聚合。
InteractiveBrokersFeed.getInstance().connect();
InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.APPL.val());
InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.GOOG.val());
2 .每次来自不同 Instrument 的数据到达时,我们使用 defer 创建一个新的可观察对象。
4 .和 5 。我们还返回 Observables 以遵守延迟签名(它需要返回一个 observable)。
6 .这就是所有魔法发生的地方 subscribeOn(Schedulers.computation()) 告诉 RxJava 在每次有人有效地订阅前一个 observable 时使用线程池计算,这使得所有代码来自 2 。到 5 。在新线程上运行。
3. 针对 IB 演示提要运行它
只需按照 github 上的说明进行操作
InteractiveBrokersFeed.getInstance().connect();
InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.APPL.val());
InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.GOOG.val());
你应该看到类似这样的东西,注意线程的名称,你可以看到每次聚合一些数据时它都是不同的,这里是 RxComputationThreadPool-4 和 RxComputationThreadPool-5。
InteractiveBrokersFeed.getInstance().connect();
InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.APPL.val());
InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.GOOG.val());
待续...(第 3 部分使用 RxJava 和 IB 编写简单的算法交易策略)