时序聚合引擎
时序聚合引擎由 createTimeSeriesEngine
函数创建。
createTimeSeriesEngine
函数的语法如下:
createTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill=’none’], [forceTriggerTime], [raftGroup], [keyPurgeFreqInSec], [closed=’left’], [outputElapsedMicroseconds=false])
重要参数
-
name:表示引擎的名称,是引擎在一个数据节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。
-
windowSize:表示计算窗口计算的时间长度
-
step:表示窗口每次移动的时间间隔
-
metrics:以元代码的格式表示计算公式,支持内置聚合函数和 UDF(自定义聚合函数)
-
dummyTable:表示输入数据的表结构
-
outputTable:表示结果的输出表
-
timeColumn:当 useSystemTime=false 时,指定 dummyTable 中的时间列作为窗口计算的触发信号,即按照事件时间进行流处理
-
useSystemTime:当参数值为 true 时,引擎按照系统时间进行流处理,与事件时间无关
-
keyColumn:表示分组字段名,例如以每支股票为一组进行计算
-
fill:表示若(某个 key 的)某个窗口无数据时填充的方法
-
'none': 不输出结果
-
'null': 输出结果为 NULL
-
'ffill': 输出上一个有数据的窗口的结果
-
'具体数值':该值的数据类型需要和对应的 metrics 计算结果的类型保持一致
-
其他参数的详细含义可以参考:createTimeSeriesEngine。
应用例子 1
下例说明数据窗口如何规整以及流数据时序引擎如何进行计算。以下代码建立流数据表 trades,包含 time 和 volume 两列。创建时序引擎 streamAggr1,每 3 毫秒对过去 6 毫秒的数据计算 sum(volume)。time 列的精度为毫秒,模拟插入的数据流频率也设为每毫秒一条数据。
share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades outputTable = table(10000:0, `time`sumVolume, [TIMESTAMP, INT]) tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=outputTable, timeColumn=`time) subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)
向流数据表 trades 中写入 10 条数据,并查看流数据表 trades 内容:
def writeData(t, n){ timev = 2018.10.08T01:01:01.001 + timestamp(1..n) volumev = take(1, n) insert into t values(timev, volumev) } writeData(trades, 10) select * from trades;
time | volume |
---|---|
2018.10.08T01:01:01.002 | 1 |
2018.10.08T01:01:01.003 | 1 |
2018.10.08T01:01:01.004 | 1 |
2018.10.08T01:01:01.005 | 1 |
2018.10.08T01:01:01.006 | 1 |
2018.10.08T01:01:01.007 | 1 |
2018.10.08T01:01:01.008 | 1 |
2018.10.08T01:01:01.009 | 1 |
2018.10.08T01:01:01.010 | 1 |
2018.10.08T01:01:01.011 | 1 |
再查看结果表 outputTable:
select * from outputTable;
time | sumVolume |
---|---|
2018.10.08T01:01:01.003 | 1 |
2018.10.08T01:01:01.006 | 4 |
2018.10.08T01:01:01.009 | 6 |
根据第一条数据时刻规整第一个窗口的起始时间后,窗口以 step 为步长移动。下面详细解释时序引擎的计算过程。为简便起见,以下提到时间时,省略相同的 2018.10.08T01:01:01 部分,只列出毫秒部分。基于第一行数据的时间 002,第一个窗口的起始时间规整为 000,到 002 结束,只包含 002 一条记录,计算被 003 记录触发,sum(volume) 的结果是 1;第二个窗口从 000 到 005,包含了 4 条数据,计算被 006 记录触发,计算结果为 4;第三个窗口从 003 到 008,包含 6 条数据,计算被 009 记录触发,计算结果为 6。虽然第四个窗口从 006 到 011 且含有 6 条数据,但是由于该窗口结束之后没有数据,所以该窗口的计算没有被触发。
若需要重复执行以上程序,应首先解除订阅,并将流数据表 trades 与时序引擎 streamAggr1 二者删除:
unsubscribeTable(tableName="trades", actionName="append_tradesAggregator") undef(`trades, SHARED) dropStreamEngine("streamAggr1")
应用例子 2
DolphinDB 时序聚合引擎支持多个窗口。
下例说如何对相同的 metrics 按不同的 windowSize 聚合。以下代码建立流数据表 trades,包含 time 和
volume 两列。创建时序引擎 streamAggr1,每 3 毫秒对过去 6 毫秒和过去 12 毫秒的数据计算
sum(volume)
。time 列的精度为毫秒,模拟插入的数据流频率也设为每毫秒一条数据。
share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades outputTable = table(10000:0, `time`sumVolume1`sumVolume2, [TIMESTAMP, INT,INT]) tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=[6,12], step=3, metrics=[<sum(volume)>,<sum(volume)>], dummyTable=trades, outputTable=outputTable, timeColumn=`time) subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)
def writeData(t, n){ timev = 2018.10.08T01:01:01.001 + timestamp(1..n) volumev = take(1, n) insert into t values(timev, volumev) } writeData(trades, 20) select * from trades;
再查看结果表 outputTable:
select * from outputTable;
time | sumVolume1 | sumVolume2 |
---|---|---|
2018.10.08T01:01:01.003 | 1 | 1 |
2018.10.08T01:01:01.006 | 4 | 4 |
2018.10.08T01:01:01.009 | 6 | 7 |
2018.10.08T01:01:01.012 | 6 | 10 |
2018.10.08T01:01:01.015 | 6 | 12 |
2018.10.08T01:01:01.018 | 6 | 12 |
2018.10.08T01:01:01.021 | 6 | 12 |
应用例子 3
DolphinDB 时序聚合引擎支持使用多种表达式进行实时计算。
-
一个或多个聚合函数
tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<sum(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
-
使用聚合结果进行计算
tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<max(ask)-min(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
-
对列与列的操作结果进行聚合计算
tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<max(ask-bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
-
输出多个聚合结果
tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<[max((ask-bid)/(ask+bid)*2), min((ask-bid)/(ask+bid)*2)]>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
-
使用多参数聚合函数
tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<corr(ask,bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time) tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<percentile(ask-bid,99)/sum(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
-
使用自定义函数
defg diff(x,y){ return sum(x)-sum(y) } tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<diff(ask, bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
-
使用多个返回结果的函数
defg sums(x){ return [sum(x),sum2(x)] } tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<sums(ask) as `sumAsk`sum2Ask>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
sum(spread(ask,bid))
。应用例子 4
系统利用 dummyTable 的 schema 来决定订阅的流数据中每一列的数据类型。dummyTable 有无数据对结果没有任何影响。
share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades modelTable = table(1000:0, `time`volume, [TIMESTAMP, INT]) outputTable = table(10000:0, `time`sumVolume, [TIMESTAMP, INT]) tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=5, step=5, metrics=<[sum(volume)]>, dummyTable=modelTable, outputTable=outputTable, timeColumn=`time) subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true) def writeData(t,n){ timev = 2018.10.08T01:01:01.001 + timestamp(1..n) volumev = take(1, n) insert into t values(timev, volumev) } writeData(trades, 6) sleep(1) select * from outputTable
应用例子 5
计算结果可以输出到内存表或流数据表。输出到内存表的数据可以更新或删除,而输出到流数据表的数据无法更新或删除,但是可以通过流数据表将结果作为另一个引擎的数据源再次发布。
下例中,时序引擎 electricityAggregator1 订阅流数据表 electricity,进行移动均值计算,并将结果输出到流数据表 outputTable1。时序引擎 electricityAggregator2 订阅 outputTable1 表,并对移动均值计算结果求移动峰值。
share streamTable(1000:0,`time`voltage`current,[TIMESTAMP,DOUBLE,DOUBLE]) as electricity //将第一个时序引擎的输出表定义为流数据表,可以再次订阅 share streamTable(10000:0,`time`avgVoltage`avgCurrent,[TIMESTAMP,DOUBLE,DOUBLE]) as outputTable1 electricityAggregator1 = createTimeSeriesEngine(name="electricityAggregator1", windowSize=10, step=10, metrics=<[avg(voltage), avg(current)]>, dummyTable=electricity, outputTable=outputTable1, timeColumn=`time, garbageSize=2000) subscribeTable(tableName="electricity", actionName="avgElectricity", offset=0, handler=append!{electricityAggregator1}, msgAsTable=true) //订阅计算结果,再次进行聚合计算 outputTable2 =table(10000:0, `time`maxVoltage`maxCurrent, [TIMESTAMP,DOUBLE,DOUBLE]) electricityAggregator2 = createTimeSeriesEngine(name="electricityAggregator2", windowSize=100, step=100, metrics=<[max(avgVoltage), max(avgCurrent)]>, dummyTable=outputTable1, outputTable=outputTable2, timeColumn=`time, garbageSize=2000) subscribeTable(tableName="outputTable1", actionName="maxElectricity", offset=0, handler=append!{electricityAggregator2}, msgAsTable=true); //向electricity表中插入500条数据 def writeData(t, n){ timev = 2018.10.08T01:01:01.000 + timestamp(1..n) voltage = 1..n * 0.1 current = 1..n * 0.05 insert into t values(timev, voltage, current) } writeData(electricity, 500);
聚合计算结果:
select * from outputTable2;
time | maxVoltage | maxCurrent |
---|---|---|
2018.10.08T01:01:01.100 | 8.45 | 4.225 |
2018.10.08T01:01:01.200 | 18.45 | 9.225 |
2018.10.08T01:01:01.300 | 28.45 | 14.225 |
2018.10.08T01:01:01.400 | 38.45 | 19.225 |
2018.10.08T01:01:01.500 | 48.45 | 24.225 |
若要对上述脚本进行重复使用,需先执行以下脚本以清除共享表、订阅以及流数据引擎:
unsubscribeTable(tableName="electricity", actionName="avgElectricity") undef(`electricity, SHARED) unsubscribeTable(tableName="outputTable1", actionName="maxElectricity") undef(`outputTable1, SHARED) dropStreamEngine("electricityAggregator1") dropStreamEngine("electricityAggregator2")
应用例子6
下例中,设定 keyColumn 参数为 sym。
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades outputTable = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=3, step=3, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=outputTable, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50) subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true) def writeData(t, n){ timev = 2018.10.08T01:01:01.001 + timestamp(1..n) symv =take(`A`B, n) volumev = take(1, n) insert into t values(timev, symv, volumev) } writeData(trades, 6)
为了方便观察,对 trades 表的 sym 列排序输出:
select * from trades order by sym
time | sym | volume |
---|---|---|
2018.10.08T01:01:01.002 | A | 1 |
2018.10.08T01:01:01.004 | A | 1 |
2018.10.08T01:01:01.006 | A | 1 |
2018.10.08T01:01:01.003 | B | 1 |
2018.10.08T01:01:01.005 | B | 1 |
2018.10.08T01:01:01.007 | B | 1 |
分组计算结果:
select * from outputTable
time | sym | sumVolume |
---|---|---|
2018.10.08T01:01:01.003 | A | 1 |
2018.10.08T01:01:01.006 | A | 1 |
2018.10.08T01:01:01.006 | B | 2 |
各组窗口规整后统一从 000 时间点开始,根据 windowSize=3 以及 step=3,每个组的窗口会按照 000-003-006 划分。
-
(1) 在 003,B组有一条数据,但是由于B组在第一个窗口没有任何数据,不会进行计算也不会产生结果,所以B组第一个窗口没有结果输出。
-
(2) 004 的A组数据触发A组第一个窗口的计算。
-
(3) 006 的A组数据触发A组第二个窗口的计算。
-
(4) 007 的B组数据触发B组第二个窗口的计算。
如果进行分组聚合计算,流数据源中的每个分组中的 'timeColumn' 必须是递增的,但是整个数据源的 'timeColumn' 可以不是递增的;如果没有进行分组聚合,那么整个数据源的 'timeColumn' 必须是递增的,否则时序引擎的输出结果会与预期不符。
应用例子 7
通过以下两个例子,可以理解 updateTime 的作用。
首先创建流数据表并写入数据:
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades insert into trades values(2018.10.08T01:01:01.785,`A,10) insert into trades values(2018.10.08T01:01:02.125,`B,26) insert into trades values(2018.10.08T01:01:10.263,`B,14) insert into trades values(2018.10.08T01:01:12.457,`A,28) insert into trades values(2018.10.08T01:02:10.789,`A,15) insert into trades values(2018.10.08T01:02:12.005,`B,9) insert into trades values(2018.10.08T01:02:30.021,`A,10) insert into trades values(2018.10.08T01:04:02.236,`A,29) insert into trades values(2018.10.08T01:04:04.412,`B,32) insert into trades values(2018.10.08T01:04:05.152,`B,23);
-
不指定 updateTime:
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) agg1 = createTimeSeriesEngine(name="agg1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false) subscribeTable(tableName="trades", actionName="agg1", offset=0, handler=append!{agg1}, msgAsTable=true) sleep(10) select * from output1;
time sym sumVolume 2018.10.08T01:02:00.000 A 38 2018.10.08T01:03:00.000 A 25 2018.10.08T01:02:00.000 B 40 2018.10.08T01:03:00.000 B 9 -
将 updateTime 设为 1000:
output2 = keyedTable(`time`sym,10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) agg2 = createTimeSeriesEngine(name="agg2", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output2, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=1000, useWindowStartTime=false) subscribeTable(tableName="trades", actionName="agg2", offset=0, handler=append!{agg2}, msgAsTable=true) sleep(2010) select * from output2;
time sym sumVolume 2018.10.08T01:02:00.000 A 38 2018.10.08T01:03:00.000 A 25 2018.10.08T01:02:00.000 B 40 2018.10.08T01:03:00.000 B 9 2018.10.08T01:05:00.000 B 55 2018.10.08T01:05:00.000 A 29
下面我们介绍以上两个例子在最后一个数据窗口(01:04:00.000 到 01:05:00.000)的区别。为简便起见,我们省略日期部分,只列出(小时:分钟:秒.毫秒)部分。假设 time 列时间亦为数据进入时序引擎的时刻。
(1) 在 01:04:04.236 时,A 分组的第一条记录到达后已经过 2000 毫秒,触发一次 A 组计算,输出表增加一条记录(01:05:00.000, `A, 29)。
(2) 在 01:04:05.152 时的 B 组记录为 01:04:04.412 所在小窗口 [01:04:04.000, 01:04:05.000) 之后第一条记录,触发一次 B 组计算,输出表增加一条记录 (01:05:00.000,"B",32)。
(3) 2000 毫秒后,在 01:04:07.152 时,由于 01:04:05.152 时的 B 组记录仍未参与计算,触发一次 B 组计算,输出一条记录 (01:05:00.000,"B",55)。由于输出表的主键为 time 和 sym,并且输出表中已有 (01:05:00.000,"B",32) 这条记录,因此将该记录更新为 (01:05:00.000,"B",55)。
应用例子 8
通过以下这个例子,可以理解 snapshotDir 和 snapshotIntervalInMsgCount 的作用。如果启用
snapshot,引擎订阅流表时,handler 需是 appendMsg
函数,需指定
handlerNeedMsgId=true,用来记录快照的消息位置。
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]); Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, snapshotDir="/home/server1/snapshotDir", snapshotIntervalInMsgCount=100) subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=appendMsg{Agg1}, msgAsTable=true, handlerNeedMsgId=true) n=500 timev=timestamp(1..n) + 2021.03.12T15:00:00.000 symv = take(`abc`def, n) pricev = int(1..n) id = take(-1, n) insert into trades values(timev, symv, pricev, id) select * from output1
time | sumprice |
---|---|
2021.03.12T15:00:00.050 | 1225 |
2021.03.12T15:00:00.100 | 4950 |
2021.03.12T15:00:00.150 | 9950 |
2021.03.12T15:00:00.200 | 14950 |
2021.03.12T15:00:00.250 | 19950 |
2021.03.12T15:00:00.300 | 24950 |
2021.03.12T15:00:00.350 | 29950 |
2021.03.12T15:00:00.400 | 34950 |
2021.03.12T15:00:00.450 | 39950 |
2021.03.12T15:00:00.500 | 44950 |
getSnapshotMsgId(Agg1) >499
取消订阅并删除引擎来模拟系统异常
unsubscribeTable(, "trades", "Agg1") dropStreamEngine("Agg1") Agg1=NULL
此时发布端仍在写入数据
n=500 timev=timestamp(501..1000) + 2021.03.12T15:00:00.000 symv = take(`abc`def, n) pricev = int(1..n) id = take(-1, n) insert into trades values(timev, symv, pricev, id)
再次创建 aggr, 加载 snapshot,从上次处理最后一条消息开始重新订阅
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, snapshotDir="/home/server1/snapshotDir", snapshotIntervalInMsgCount=100) ofst=getSnapshotMsgId(Agg1) print(ofst) >499 subscribeTable(server="", tableName="trades", actionName="Agg1",offset=ofst+1, handler=appendMsg{Agg1}, msgAsTable=true, handlerNeedMsgId=true) select * from output1
time | sumprice |
---|---|
2021.03.12T15:00:00.050 | 1225 |
2021.03.12T15:00:00.100 | 4950 |
2021.03.12T15:00:00.150 | 9950 |
2021.03.12T15:00:00.200 | 14950 |
2021.03.12T15:00:00.250 | 19950 |
2021.03.12T15:00:00.300 | 24950 |
2021.03.12T15:00:00.350 | 29950 |
2021.03.12T15:00:00.400 | 34950 |
2021.03.12T15:00:00.450 | 39950 |
2021.03.12T15:00:00.500 | 44950 |
2021.03.12T15:00:00.550 | 25450 |
2021.03.12T15:00:00.600 | 5450 |
2021.03.12T15:00:00.650 | 9950 |
2021.03.12T15:00:00.700 | 14950 |
2021.03.12T15:00:00.750 | 19950 |
2021.03.12T15:00:00.800 | 24950 |
2021.03.12T15:00:00.850 | 29950 |
2021.03.12T15:00:00.900 | 34950 |
2021.03.12T15:00:00.950 | 39950 |
2021.03.12T15:00:01.000 | 44950 |
结果和订阅不中断一样。
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]); Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time) subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=append!{Agg1}, msgAsTable=true) n=500 timev=timestamp(1..n) + 2021.03.12T15:00:00.000 symv = take(`abc`def, n) pricev = int(1..n) id = take(-1, n) insert into trades values(timev, symv, pricev, id) n=500 timev=timestamp(501..1000) + 2021.03.12T15:00:00.000 symv = take(`abc`def, n) pricev = int(1..n) id = take(-1, n) insert into trades values(timev, symv, pricev, id) select * from output1
time | sumprice |
---|---|
2021.03.12T15:00:00.050 | 1225 |
2021.03.12T15:00:00.100 | 4950 |
2021.03.12T15:00:00.150 | 9950 |
2021.03.12T15:00:00.200 | 14950 |
2021.03.12T15:00:00.250 | 19950 |
2021.03.12T15:00:00.300 | 24950 |
2021.03.12T15:00:00.350 | 29950 |
2021.03.12T15:00:00.400 | 34950 |
2021.03.12T15:00:00.450 | 39950 |
2021.03.12T15:00:00.500 | 44950 |
2021.03.12T15:00:00.550 | 25450 |
2021.03.12T15:00:00.600 | 5450 |
2021.03.12T15:00:00.650 | 9950 |
2021.03.12T15:00:00.700 | 14950 |
2021.03.12T15:00:00.750 | 19950 |
2021.03.12T15:00:00.800 | 24950 |
2021.03.12T15:00:00.850 | 29950 |
2021.03.12T15:00:00.900 | 34950 |
2021.03.12T15:00:00.950 | 39950 |
2021.03.12T15:00:01.000 | 44950 |
如果不开启 snapshot,即使从上次中断的地方开始订阅,得到的结果也与订阅不中断不一样。
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]); Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time) subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=append!{Agg1}, msgAsTable=true) n=500 timev=timestamp(1..n) + 2021.03.12T15:00:00.000 symv = take(`abc`def, n) pricev = int(1..n) id = take(-1, n) insert into trades values(timev, symv, pricev, id) unsubscribeTable(, "trades", "Agg1") dropStreamEngine("Agg1") Agg1=NULL n=500 timev=timestamp(501..1000) + 2021.03.12T15:00:00.000 symv = take(`abc`def, n) pricev = int(1..n) id = take(-1, n) insert into trades values(timev, symv, pricev, id) Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time) subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 500, handler=append!{Agg1}, msgAsTable=true) select * from output1
time | sumprice |
---|---|
2021.03.12T15:00:00.050 | 1225 |
2021.03.12T15:00:00.100 | 4950 |
2021.03.12T15:00:00.150 | 9950 |
2021.03.12T15:00:00.200 | 14950 |
2021.03.12T15:00:00.250 | 19950 |
2021.03.12T15:00:00.300 | 24950 |
2021.03.12T15:00:00.350 | 29950 |
2021.03.12T15:00:00.400 | 34950 |
2021.03.12T15:00:00.450 | 39950 |
2021.03.12T15:00:00.500 | 44950 |
2021.03.12T15:00:00.550 | 1225 |
2021.03.12T15:00:00.600 | 4950 |
2021.03.12T15:00:00.650 | 9950 |
2021.03.12T15:00:00.700 | 14950 |
2021.03.12T15:00:00.750 | 19950 |
2021.03.12T15:00:00.800 | 24950 |
2021.03.12T15:00:00.850 | 29950 |
2021.03.12T15:00:00.900 | 34950 |
2021.03.12T15:00:00.950 | 39950 |
2021.03.12T15:00:01.000 | 44950 |
应用例子 9
使用 subscribeTable
函数时,可利用 handler 参数过滤订阅的流数据。
在下例中,传感器采集电压和电流数据并实时上传作为流数据源,其中电压 voltage<=122 或电流 current=NULL 的数据需要在进入时序引擎之前过滤掉。
share streamTable(1000:0, `time`voltage`current, [TIMESTAMP, DOUBLE, DOUBLE]) as electricity outputTable = table(10000:0, `time`avgVoltage`avgCurrent, [TIMESTAMP, DOUBLE, DOUBLE]) //自定义数据处理过程,过滤 voltage<=122 或 current=NULL的无效数据。 def append_after_filtering(inputTable, msg){ t = select * from msg where voltage>122, isValid(current) if(size(t)>0){ insert into inputTable values(t.time,t.voltage,t.current) } } electricityAggregator = createTimeSeriesEngine(name="electricityAggregator", windowSize=6, step=3, metrics=<[avg(voltage), avg(current)]>, dummyTable=electricity, outputTable=outputTable, timeColumn=`time, garbageSize=2000) subscribeTable(tableName="electricity", actionName="avgElectricity", offset=0, handler=append_after_filtering{electricityAggregator}, msgAsTable=true) //模拟产生数据 def writeData(t, n){ timev = 2018.10.08T01:01:01.001 + timestamp(1..n) voltage = 120+1..n * 1.0 current = take([1,NULL,2]*0.1, n) insert into t values(timev, voltage, current); } writeData(electricity, 10)
流数据表:
select * from electricity
time | voltage | current |
---|---|---|
2018.10.08T01:01:01.002 | 121 | 0.1 |
2018.10.08T01:01:01.003 | 122 | |
2018.10.08T01:01:01.004 | 123 | 0.2 |
2018.10.08T01:01:01.005 | 124 | 0.1 |
2018.10.08T01:01:01.006 | 125 | |
2018.10.08T01:01:01.007 | 126 | 0.2 |
2018.10.08T01:01:01.008 | 127 | 0.1 |
2018.10.08T01:01:01.009 | 128 | |
2018.10.08T01:01:01.010 | 129 | 0.2 |
2018.10.08T01:01:01.011 | 130 | 0.1 |
聚合计算结果:
select * from outputTable
time | avgVoltage | avgCurrent |
---|---|---|
2018.10.08T01:01:01.006 | 123.5 | 0.15 |
2018.10.08T01:01:01.009 | 125 | 0.15 |
由于 voltage<=122 或 current=NULL 的数据已经在进入时序引擎时被过滤了,所以第一个窗口 [000,003) 里没有数据,也就没有发生计算。
应用例子 10
outputTable 参数除了可以是表之外还可以是其他流数据计算引擎:
share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades share streamTable(1000:0, `time`sym`open`close`high`low`volume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as kline outputTable=table(1000:0, `sym`factor1, [SYMBOL, DOUBLE]) Rengine=createReactiveStateEngine(name="reactive", metrics=<[mavg(open, 3)]>, dummyTable=kline, outputTable=outputTable, keyColumn="sym") Tengine=createTimeSeriesEngine(name="timeseries", windowSize=6000, step=6000, metrics=<[first(price), last(price), max(price), min(price), sum(volume)]>, dummyTable=trades, outputTable=Rengine, timeColumn=`time, useSystemTime=false, keyColumn=`sym) //时间序列引擎的结果输入响应式状态引擎 subscribeTable(server="", tableName="trades", actionName="timeseries", offset=0, handler=append!{Tengine}, msgAsTable=true)
运维相关函数
DolphinDB 提供流数据引擎的管理函数,方便查询和管理系统中已经存在的流数据引擎。
-
获取已定义的流数据引擎清单,可使用函数
getStreamEngineStat (deprecated name: getAggregatorStat)
-
获取流数据引擎的句柄,可使用函数
getStreamEngine (deprecated name: getAggregator)
-
删除流数据引擎,可使用函数
dropStreamEngine (deprecated name: dropAggregator)