横截面引擎

横截面引擎由 createCrossSectionalEngine 函数创建。

横截面引擎对实时数据中每组的最新数据进行运算,比如金融里对所有股票的最新交易量求聚合值,工业物联网里对一批设备的最新温度求聚合值等,都需要用到横截面引擎。

横截面引擎包含两个部分:

  • 横截面数据表(键值表):保存所有分组的最新记录

  • 计算引擎:是一组聚合计算表达式以及触发器,系统会按照指定的规则触发对横截面数据表进行计算,计算结果会输出到指定的输出表中

createCrossSectionalEngine 函数的语法如下:

createCrossSectionalEngine(name, [metrics], dummyTable, [outputTable], keyColumn, [triggeringPattern='perBatch'], [triggeringInterval=1000], [useSystemTime=true], [timeColumn], [lastBatchOnly=false], [contextByColumn], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup], [outputElapsedMicroseconds=false], [roundTime=true])

其参数的详细含义可以参考:createCrossSectionalEngine

应用例子 1

股市的交易数据会实时以流数据的形式写入数据表 trades。该表结构如下所示:

股票代码(sym) | 时间(time)| 成交价(price)| 成交数(qty)

trades 表会随着时间推进不断积累各个股票从开盘到当前为止的交易数据。在交易数据持续写入的过程中,用户需要实时计算所有股票的最新成交量之最大值、最新成交金额之最大值,以及最新交易金额之和。

在以下步骤中,使用横截面引擎结合流数据订阅,以实现上述场景:

定义流数据表,以写入模拟交易数据。

share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades

定义结果表,以保存横截面引擎计算的结果。

outputTable = table(10:0, `time`maxQty`maxDollarVolume`sumDollarVolume, [TIMESTAMP,INT,DOUBLE,DOUBLE])

创建横截面引擎,指定表达式、输入表、结果表、分组列、计算频率。返回的对象 tradesCrossAggregator 为保存横截面数据的表。

tradesCrossAggregator=createCrossSectionalEngine(name="CrossSectionalDemo", metrics=<[max(qty), max(price*qty), sum(price*qty)]>, dummyTable=trades, outputTable=outputTable, keyColumn=`sym, triggeringPattern=`perRow, useSystemTime=false, timeColumn=`time)

订阅流数据表,将新写入的流数据追加到横截面引擎中。

subscribeTable(tableName="trades", actionName="tradesCrossAggregator", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true)

最后模拟生成实时交易流数据。

def writeData(n){
   timev  = 2000.10.08T01:01:01.001 + timestamp(1..n)
   symv   = take(`A`B, n)
   pricev = take(102.1 33.4 73.6 223,n)
   qtyv   = take(60 74 82 59, n)
   insert into trades values(timev, symv, pricev, qtyv)
}
writeData(4)

完整脚本:

share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
outputTable = table(10:0, `time`maxQty`maxDollarVolume`sumDollarVolume, [TIMESTAMP,INT,DOUBLE,DOUBLE])
tradesCrossAggregator=createCrossSectionalEngine(name="CrossSectionalDemo", metrics=<[max(qty), max(price*qty), sum(price*qty)]>, dummyTable=trades, outputTable=outputTable, keyColumn=`sym, triggeringPattern=`perRow, useSystemTime=false, timeColumn=`time)
subscribeTable(tableName="trades", actionName="tradesCrossAggregator", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true)
def writeData(n){
   timev  = 2000.10.08T01:01:01.001 + timestamp(1..n)
   symv   = take(`A`B, n)
   pricev = take(102.1 33.4 73.6 223, n)
   qtyv   = take(60 74 82 59, n)
   insert into trades values(timev, symv, pricev,qtyv)
}
writeData(4)

执行完成后,查询流数据表,共有 A 与 B 两只股票的 4 笔交易数据:

select * from trades

time

sym

price

qty

2000.10.08T01:01:01.002 A 102.1 60
2000.10.08T01:01:01.003 B 33.4 74
2000.10.08T01:01:01.004 A 73.6 82
2000.10.08T01:01:01.005 B 223 59

截面数据表保存了 A 与 B 两只股票最新的交易数据:

select * from tradesCrossAggregator

time

sym

price

qty

2000.10.08T01:01:01.004 A 73.6 82
2000.10.08T01:01:01.005 B 223 59

由于横截面引擎采用了 "perRow" 每行触发计算的频率,所以每向横截面表写入一行数据,横截面引擎都会进行一次计算,向结果表插入一条结果数据:

select * from outputTable

time

maxQty

maxDollarVolume

sumDollarVolume

2019.04.08T04:26:01.634 60 6126 6126
2019.04.08T04:26:01.634 74 6126 8597.6
2019.04.08T04:26:01.634 82 6035.2 8506.8
2019.04.08T04:26:01.634 82 13157 19192.2

在进行后续操作之前,我们首先取消以上订阅,并取消横截面引擎的调用,并删除数据表 trades

unsubscribeTable(,`trades, "tradesCrossAggregator")
dropAggregator("CrossSectionalDemo")
undef(`trades, SHARED)

应用例子 2

triggeringPattern 支持三种模式,若取值 "perBatch" 时表示每追加一批数据触发一次写入。以下按 "perBatch" 模式启用横截面引擎,脚本共生成 12 条记录,分三批写入,预期产生 3 次输出:

share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
outputTable = table(1:0, `time`maxQty`maxDollarVolume`sumDollarVolume, [TIMESTAMP,INT,DOUBLE,DOUBLE])
tradesCrossAggregator=createCrossSectionalEngine("CrossSectionalDemo", <[max(qty), max(price*qty), sum(price*qty)]>, trades, outputTable, `sym, `perBatch, useSystemTime=false, timeColumn=`time)
subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true)
def writeData1(){
  timev  = 2000.10.08T01:01:01.001 + timestamp(1..4)
  symv   = take(`A`B, 4)
  pricev = 102.1 33.4 102.3 33.2
  qtyv   = 10 20 40 30
  insert into trades values(timev, symv, pricev,qtyv)
}
def writeData2(){
  timev  = 2000.10.08T01:01:01.005 + timestamp(1..2)
  symv   = `A`B
  pricev = 102.4 33.1
  qtyv   = 120 60
  insert into trades values(timev, symv, pricev,qtyv)
}
//写入2批数据,预期会触发2次计算,输出2次聚合结果。
writeData1();
sleep(100)
writeData2();
dropAggregator(`CrossSectionalDemo)
unsubscribeTable(, `trades, `tradesCrossAggregator)

trades 表中,共写入了 6 条记录:

select * from trades

time

sym

price

qty

2000.10.08T01:01:01.002 A 102.1 10
2000.10.08T01:01:01.003 B 33.4 20
2000.10.08T01:01:01.004 A 102.3 40
2000.10.08T01:01:01.005 B 33.2 30
2000.10.08T01:01:01.006 A 102.4 120
2000.10.08T01:01:01.007 B 33.1 60

横截面表包含每组最新记录:

select * from tradesCrossAggregator

time

sym

price

qty

2000.10.08T01:01:01.006 A 102.4 120
2000.10.08T01:01:01.007 B 33.1 60

由于分 2 次写入,在 perBatch 模式下,横截面引擎输出了 2 条记录:

select * from outputTable

time

maxQty

maxDollarVolume

sumDollarVolume

2019.04.08T04:52:50.255 40 4092 5088
2019.04.08T04:52:50.355 120 12288 14274

应用例子 3

triggeringPattern 取值 "interval" 时,必须配合 triggeringInterval 参数一起使用,表示每隔 triggeringInterval 毫秒触发一次计算。本例分 6 次写入数据,每 500 毫秒触发一次计算,每次 1 条数据,间隔 500 或 1000 毫秒。请注意,这里没有指定 useSystemTime 参数为 false,会返回计算发生的时刻。

share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
outputTable = table(1:0, `time`avgPrice`volume`dollarVolume`count, [TIMESTAMP,DOUBLE,INT,DOUBLE,INT])
tradesCrossAggregator=createCrossSectionalEngine(name="tradesCrossAggregator", metrics=<[avg(price), sum(qty), sum(price*qty), count(price)]>, dummyTable=trades, outputTable=outputTable, keyColumn=`sym, triggeringPattern="interval", triggeringInterval=500)
subscribeTable(tableName="trades", actionName="tradesStats", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true)

insert into trades values(2020.08.12T09:30:00.000, `A, 10, 20)
sleep(500)
insert into trades values(2020.08.12T09:30:00.000 + 500, `B, 20, 10)
sleep(500)
insert into trades values(2020.08.12T09:30:00.000 + 1000, `A, 10.1, 20)
sleep(1000)
insert into trades values(2020.08.12T09:30:00.000 + 2000, `B, 20.1, 30)
sleep(500)
insert into trades values(2020.08.12T09:30:00.000 + 2500, `B, 20.2, 40)
sleep(500)
insert into trades values(2020.08.12T09:30:00.000 + 3000, `A, 10.2, 20)

select * from outputTable;

time

avgPrice

volume

dollarVolume

count

2021.07.27T10:54:00.303 10 20 200 1
2021.07.27T10:54:00.818 15 30 400 2
2021.07.27T10:54:01.331 15.05 30 402 2
2021.07.27T10:54:02.358 15.1 50 805 2
2021.07.27T10:54:02.871 15.15 60 1010 2
2021.07.27T10:54:03.386 15.2 60 1012 2

输出表的记录数会不断增长。这是因为 triggeringPattern="interval" 时,计算是按照系统时间定时触发,与是否有新数据进入无关。

  • 横截面表作为最终结果

在以上的例子中,createCrossSectionalEngine 的返回结果(以下称为横截面表)是为聚合计算提供的一个中间结果,但横截面表亦可为最终结果。例如若需要定时刷新某只股票的最新交易价格,按照常规思路是从实时交易表中按代码筛选股票并取出最后一条记录,而交易表的数据量是随着时间快速增长的,如果频繁做这样的查询,无论从系统的资源消耗还是从查询的效能来看都不是最优的做法。而横截面表永远只保存所有股票的最近一次交易数据,数据量是稳定的,对于这种定时轮询的场景非常合适。

要将横截面表作为最终结果,需要在创建横截面时,对 metricsoutputTable 这两个参数置空。

tradesCrossAggregator=createCrossSectionalEngine("CrossSectionalDemo", , trades, , `sym, `perRow)