在我最近的一个项目中,我使用 Iteractive Brokers Java API 自动化了一个交易策略, RxJava 是处理大量实时和历史数据的完美配套框架。阅读 RxJava 的文档和示例可能会令人生畏且非常抽象,这里是一个如何使用它以及它可以做什么的实际示例:
- 将市场数据提供给 RxJava marketDataObservable 类
- 使用 groupBy、平面图和缓冲区运算符将报价数据聚合到 1 分钟柱
1. 将市场数据提供给 RxJava marketDataObservable 类
public void subscribeRealTimeData(Instrument instrument) {
controller.reqTopMktData(instrument.ibContract, "232", false, new ApiController.ITopMktDataHandler()
{
@Override
public void tickPrice(TickType tickType, double price, int canAutoExecute) {
if (tickType == TickType.ASK)
{
log.info("IB tick " + new Date() + " price " + price);
LivePriceEvent priceEvent = new LivePriceEvent(System.currentTimeMillis(), instrument, new BigDecimal(price).setScale(3, RoundingMode.UP));
marketDataObservable.push(priceEvent);
}
}
现在每次报价从 IB 到达时,它都会被推送到我们的 Obseravble。现在我们可以根据需要使用 RxJava Observable 的不同运算符折叠数据
2. 将报价数据汇总到 1 分钟柱
public void subscribeRealTimeData(Instrument instrument) {
controller.reqTopMktData(instrument.ibContract, "232", false, new ApiController.ITopMktDataHandler()
{
@Override
public void tickPrice(TickType tickType, double price, int canAutoExecute) {
if (tickType == TickType.ASK)
{
log.info("IB tick " + new Date() + " price " + price);
LivePriceEvent priceEvent = new LivePriceEvent(System.currentTimeMillis(), instrument, new BigDecimal(price).setScale(3, RoundingMode.UP));
marketDataObservable.push(priceEvent);
}
}
3. 针对 IB 演示提要运行分钟柱聚合器
只需按照 github 上的说明进行操作
public void subscribeRealTimeData(Instrument instrument) {
controller.reqTopMktData(instrument.ibContract, "232", false, new ApiController.ITopMktDataHandler()
{
@Override
public void tickPrice(TickType tickType, double price, int canAutoExecute) {
if (tickType == TickType.ASK)
{
log.info("IB tick " + new Date() + " price " + price);
LivePriceEvent priceEvent = new LivePriceEvent(System.currentTimeMillis(), instrument, new BigDecimal(price).setScale(3, RoundingMode.UP));
marketDataObservable.push(priceEvent);
}
}
你应该看到这样的东西
public void subscribeRealTimeData(Instrument instrument) {
controller.reqTopMktData(instrument.ibContract, "232", false, new ApiController.ITopMktDataHandler()
{
@Override
public void tickPrice(TickType tickType, double price, int canAutoExecute) {
if (tickType == TickType.ASK)
{
log.info("IB tick " + new Date() + " price " + price);
LivePriceEvent priceEvent = new LivePriceEvent(System.currentTimeMillis(), instrument, new BigDecimal(price).setScale(3, RoundingMode.UP));
marketDataObservable.push(priceEvent);
}
}