时序聚合引擎

时序聚合引擎由 createTimeSeriesEngine 函数创建。

createTimeSeriesEngine 函数的语法如下:

createTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill='none'], [forceTriggerTime], [raftGroup], [keyPurgeFreqInSec=-1], [closed='left'], [outputElapsedMicroseconds=false], [subWindow], [parallelism=1], [acceptedDelay=0], [outputHandler=NULL], [msgAsTable=false])

重要参数

  • name:表示引擎的名称,是引擎在一个数据节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。

  • windowSize:表示计算窗口计算的时间长度

  • step:表示窗口每次移动的时间间隔

  • metrics:以元代码的格式表示计算公式,支持内置聚合函数和 UDF(自定义聚合函数)

  • dummyTable:表示输入数据的表结构

  • outputTable:表示结果的输出表

  • timeColumn:当 useSystemTime=false 时,指定 dummyTable 中的时间列作为窗口计算的触发信号,即按照事件时间进行流处理

  • useSystemTime:当参数值为 true 时,引擎按照系统时间进行流处理,与事件时间无关

  • keyColumn:表示分组字段名,例如以每支股票为一组进行计算

  • fill:表示若(某个 key 的)某个窗口无数据时填充的方法

    • 'none': 不输出结果

    • 'null': 输出结果为 NULL

    • 'ffill': 输出上一个有数据的窗口的结果

    • '具体数值':该值的数据类型需要和对应的 metrics 计算结果的类型保持一致

其他参数的详细含义可以参考:createTimeSeriesEngine

应用例子 1

下例说明数据窗口如何规整以及流数据时序引擎如何进行计算。以下代码建立流数据表 trades,包含 time 和 volume 两列。创建时序引擎 streamAggr1,每 3 毫秒对过去 6 毫秒的数据计算 sum(volume)。time 列的精度为毫秒,模拟插入的数据流频率也设为每毫秒一条数据。

share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades
outputTable = table(10000:0, `time`sumVolume, [TIMESTAMP, INT])
tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=outputTable, timeColumn=`time)
subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)    

向流数据表 trades 中写入 10 条数据,并查看流数据表 trades 内容:

def writeData(t, n){
    timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
    volumev = take(1, n)
    insert into t values(timev, volumev)
}
writeData(trades, 10)

select * from trades;
time volume
2018.10.08T01:01:01.002 1
2018.10.08T01:01:01.003 1
2018.10.08T01:01:01.004 1
2018.10.08T01:01:01.005 1
2018.10.08T01:01:01.006 1
2018.10.08T01:01:01.007 1
2018.10.08T01:01:01.008 1
2018.10.08T01:01:01.009 1
2018.10.08T01:01:01.010 1
2018.10.08T01:01:01.011 1

再查看结果表 outputTable:

select * from outputTable;
time sumVolume
2018.10.08T01:01:01.003 1
2018.10.08T01:01:01.006 4
2018.10.08T01:01:01.009 6

根据第一条数据时刻规整第一个窗口的起始时间后,窗口以 step 为步长移动。下面详细解释时序引擎的计算过程。为简便起见,以下提到时间时,省略相同的 2018.10.08T01:01:01 部分,只列出毫秒部分。基于第一行数据的时间 002,第一个窗口的起始时间规整为 000,到 002 结束,只包含 002 一条记录,计算被 003 记录触发,sum(volume) 的结果是 1;第二个窗口从 000 到 005,包含了 4 条数据,计算被 006 记录触发,计算结果为 4;第三个窗口从 003 到 008,包含 6 条数据,计算被 009 记录触发,计算结果为 6。虽然第四个窗口从 006 到 011 且含有 6 条数据,但是由于该窗口结束之后没有数据,所以该窗口的计算没有被触发。

若需要重复执行以上程序,应首先解除订阅,并将流数据表 trades 与时序引擎 streamAggr1 二者删除:

unsubscribeTable(tableName="trades", actionName="append_tradesAggregator")
undef(`trades, SHARED)
dropStreamEngine("streamAggr1")

应用例子 2

DolphinDB 时序聚合引擎支持多个窗口。

下例说如何对相同的 metrics 按不同的 windowSize 聚合。以下代码建立流数据表 trades,包含 time 和 volume 两列。创建时序引擎 streamAggr1,每 3 毫秒对过去 6 毫秒和过去 12 毫秒的数据计算 sum(volume)。time 列的精度为毫秒,模拟插入的数据流频率也设为每毫秒一条数据。

share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades
outputTable = table(10000:0, `time`sumVolume1`sumVolume2, [TIMESTAMP, INT,INT])
tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=[6,12], step=3, metrics=[<sum(volume)>,<sum(volume)>], dummyTable=trades, outputTable=outputTable, timeColumn=`time)
subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)    
def writeData(t, n){
    timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
    volumev = take(1, n)
    insert into t values(timev, volumev)
}
writeData(trades, 20)

select * from trades;

再查看结果表 outputTable:

select * from outputTable;
time sumVolume1 sumVolume2
2018.10.08T01:01:01.003 1 1
2018.10.08T01:01:01.006 4 4
2018.10.08T01:01:01.009 6 7
2018.10.08T01:01:01.012 6 10
2018.10.08T01:01:01.015 6 12
2018.10.08T01:01:01.018 6 12
2018.10.08T01:01:01.021 6 12

应用例子 3

DolphinDB 时序聚合引擎支持使用多种表达式进行实时计算。

  • 一个或多个聚合函数

    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<sum(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • 使用聚合结果进行计算

    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<max(ask)-min(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • 对列与列的操作结果进行聚合计算

    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<max(ask-bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • 输出多个聚合结果

    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<[max((ask-bid)/(ask+bid)*2), min((ask-bid)/(ask+bid)*2)]>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • 使用多参数聚合函数

    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<corr(ask,bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
    
    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<percentile(ask-bid,99)/sum(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • 使用自定义函数

    defg diff(x,y){
    	return sum(x)-sum(y)
    }
    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<diff(ask, bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • 使用多个返回结果的函数

    defg sums(x){
    	return [sum(x),sum2(x)]
    }
    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<sums(ask) as `sumAsk`sum2Ask>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
注: 不支持聚合函数嵌套调用,例如 sum(spread(ask,bid))

应用例子 4

系统利用 dummyTable 的 schema 来决定订阅的流数据中每一列的数据类型。dummyTable 有无数据对结果没有任何影响。

share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades
modelTable = table(1000:0, `time`volume, [TIMESTAMP, INT])
outputTable = table(10000:0, `time`sumVolume, [TIMESTAMP, INT])
tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=5, step=5, metrics=<[sum(volume)]>, dummyTable=modelTable, outputTable=outputTable, timeColumn=`time)
subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)    

def writeData(t,n){
    timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
    volumev = take(1, n)
    insert into t values(timev, volumev)
}

writeData(trades, 6)

sleep(1)
select * from outputTable

应用例子 5

计算结果可以输出到内存表或流数据表。输出到内存表的数据可以更新或删除,而输出到流数据表的数据无法更新或删除,但是可以通过流数据表将结果作为另一个引擎的数据源再次发布。

下例中,时序引擎 electricityAggregator1 订阅流数据表 electricity,进行移动均值计算,并将结果输出到流数据表 outputTable1。时序引擎 electricityAggregator2 订阅 outputTable1 表,并对移动均值计算结果求移动峰值。

share streamTable(1000:0,`time`voltage`current,[TIMESTAMP,DOUBLE,DOUBLE]) as electricity

//将第一个时序引擎的输出表定义为流数据表,可以再次订阅
share streamTable(10000:0,`time`avgVoltage`avgCurrent,[TIMESTAMP,DOUBLE,DOUBLE]) as outputTable1 

electricityAggregator1 = createTimeSeriesEngine(name="electricityAggregator1", windowSize=10, step=10, metrics=<[avg(voltage), avg(current)]>, dummyTable=electricity, outputTable=outputTable1, timeColumn=`time, garbageSize=2000)
subscribeTable(tableName="electricity", actionName="avgElectricity", offset=0, handler=append!{electricityAggregator1}, msgAsTable=true)

//订阅计算结果,再次进行聚合计算
outputTable2 =table(10000:0, `time`maxVoltage`maxCurrent, [TIMESTAMP,DOUBLE,DOUBLE])
electricityAggregator2 = createTimeSeriesEngine(name="electricityAggregator2", windowSize=100, step=100, metrics=<[max(avgVoltage), max(avgCurrent)]>, dummyTable=outputTable1, outputTable=outputTable2, timeColumn=`time, garbageSize=2000)
subscribeTable(tableName="outputTable1", actionName="maxElectricity", offset=0, handler=append!{electricityAggregator2}, msgAsTable=true);

//向electricity表中插入500条数据
def writeData(t, n){
        timev = 2018.10.08T01:01:01.000 + timestamp(1..n)
        voltage = 1..n * 0.1
        current = 1..n * 0.05
        insert into t values(timev, voltage, current)
}
writeData(electricity, 500);

聚合计算结果:

select * from outputTable2;
time maxVoltage maxCurrent
2018.10.08T01:01:01.100 8.45 4.225
2018.10.08T01:01:01.200 18.45 9.225
2018.10.08T01:01:01.300 28.45 14.225
2018.10.08T01:01:01.400 38.45 19.225
2018.10.08T01:01:01.500 48.45 24.225

若要对上述脚本进行重复使用,需先执行以下脚本以清除共享表、订阅以及流数据引擎:

unsubscribeTable(tableName="electricity", actionName="avgElectricity")
undef(`electricity, SHARED)
unsubscribeTable(tableName="outputTable1", actionName="maxElectricity")
undef(`outputTable1, SHARED)
dropStreamEngine("electricityAggregator1")
dropStreamEngine("electricityAggregator2")

应用例子6

下例中,设定 keyColumn 参数为 sym。

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
outputTable = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=3, step=3, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=outputTable, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50)
subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)    

def writeData(t, n){
    timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
    symv =take(`A`B, n)
    volumev = take(1, n)
    insert into t values(timev, symv, volumev)
}

writeData(trades, 6)

为了方便观察,对 trades 表的 sym 列排序输出:

select * from trades order by sym
time sym volume
2018.10.08T01:01:01.002 A 1
2018.10.08T01:01:01.004 A 1
2018.10.08T01:01:01.006 A 1
2018.10.08T01:01:01.003 B 1
2018.10.08T01:01:01.005 B 1
2018.10.08T01:01:01.007 B 1

分组计算结果:

select * from outputTable
time sym sumVolume
2018.10.08T01:01:01.003 A 1
2018.10.08T01:01:01.006 A 1
2018.10.08T01:01:01.006 B 2

各组窗口规整后统一从 000 时间点开始,根据 windowSize=3 以及 step=3,每个组的窗口会按照 000-003-006 划分。

  • (1) 在 003,B组有一条数据,但是由于B组在第一个窗口没有任何数据,不会进行计算也不会产生结果,所以B组第一个窗口没有结果输出。

  • (2) 004 的A组数据触发A组第一个窗口的计算。

  • (3) 006 的A组数据触发A组第二个窗口的计算。

  • (4) 007 的B组数据触发B组第二个窗口的计算。

如果进行分组聚合计算,流数据源中的每个分组中的 'timeColumn' 必须是递增的,但是整个数据源的 'timeColumn' 可以不是递增的;如果没有进行分组聚合,那么整个数据源的 'timeColumn' 必须是递增的,否则时序引擎的输出结果会与预期不符。

应用例子 7

通过以下两个例子,可以理解 updateTime 的作用。

首先创建流数据表并写入数据:

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23);
  • 不指定 updateTime

    output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
    agg1 = createTimeSeriesEngine(name="agg1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
    subscribeTable(tableName="trades", actionName="agg1", offset=0, handler=append!{agg1}, msgAsTable=true)
    
    sleep(10)
    
    select * from output1;
    time sym sumVolume
    2018.10.08T01:02:00.000 A 38
    2018.10.08T01:03:00.000 A 25
    2018.10.08T01:02:00.000 B 40
    2018.10.08T01:03:00.000 B 9
  • updateTime 设为 1000:

    output2 = keyedTable(`time`sym,10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
    agg2 = createTimeSeriesEngine(name="agg2", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output2, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=1000, useWindowStartTime=false)
    subscribeTable(tableName="trades", actionName="agg2", offset=0, handler=append!{agg2}, msgAsTable=true)
    
    sleep(2010)
    
    select * from output2;
    time sym sumVolume
    2018.10.08T01:02:00.000 A 38
    2018.10.08T01:03:00.000 A 25
    2018.10.08T01:02:00.000 B 40
    2018.10.08T01:03:00.000 B 9
    2018.10.08T01:05:00.000 B 55
    2018.10.08T01:05:00.000 A 29

下面我们介绍以上两个例子在最后一个数据窗口(01:04:00.000 到 01:05:00.000)的区别。为简便起见,我们省略日期部分,只列出(小时:分钟:秒.毫秒)部分。假设 time 列时间亦为数据进入时序引擎的时刻。

(1) 在 01:04:04.236 时,A 分组的第一条记录到达后已经过 2000 毫秒,触发一次 A 组计算,输出表增加一条记录(01:05:00.000, `A, 29)。

(2) 在 01:04:05.152 时的 B 组记录为 01:04:04.412 所在小窗口 [01:04:04.000, 01:04:05.000) 之后第一条记录,触发一次 B 组计算,输出表增加一条记录 (01:05:00.000,"B",32)。

(3) 2000 毫秒后,在 01:04:07.152 时,由于 01:04:05.152 时的 B 组记录仍未参与计算,触发一次 B 组计算,输出一条记录 (01:05:00.000,"B",55)。由于输出表的主键为 time 和 sym,并且输出表中已有 (01:05:00.000,"B",32) 这条记录,因此将该记录更新为 (01:05:00.000,"B",55)。

应用例子 8

通过以下这个例子,可以理解 snapshotDirsnapshotIntervalInMsgCount 的作用。如果启用 snapshot,引擎订阅流表时,handler 需是 appendMsg 函数,需指定 handlerNeedMsgId=true,用来记录快照的消息位置。

share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades
output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]);
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, snapshotDir="/home/server1/snapshotDir", snapshotIntervalInMsgCount=100)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=appendMsg{Agg1}, msgAsTable=true, handlerNeedMsgId=true)

n=500
timev=timestamp(1..n) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

select * from output1
time sumprice
2021.03.12T15:00:00.050 1225
2021.03.12T15:00:00.100 4950
2021.03.12T15:00:00.150 9950
2021.03.12T15:00:00.200 14950
2021.03.12T15:00:00.250 19950
2021.03.12T15:00:00.300 24950
2021.03.12T15:00:00.350 29950
2021.03.12T15:00:00.400 34950
2021.03.12T15:00:00.450 39950
2021.03.12T15:00:00.500 44950
getSnapshotMsgId(Agg1) >499

取消订阅并删除引擎来模拟系统异常

unsubscribeTable(, "trades", "Agg1")
dropStreamEngine("Agg1")
Agg1=NULL

此时发布端仍在写入数据

n=500
timev=timestamp(501..1000) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

再次创建 aggr, 加载 snapshot,从上次处理最后一条消息开始重新订阅

Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, snapshotDir="/home/server1/snapshotDir", snapshotIntervalInMsgCount=100)

ofst=getSnapshotMsgId(Agg1)
print(ofst)
>499

subscribeTable(server="", tableName="trades", actionName="Agg1",offset=ofst+1, handler=appendMsg{Agg1}, msgAsTable=true, handlerNeedMsgId=true)

select * from output1
time sumprice
2021.03.12T15:00:00.050 1225
2021.03.12T15:00:00.100 4950
2021.03.12T15:00:00.150 9950
2021.03.12T15:00:00.200 14950
2021.03.12T15:00:00.250 19950
2021.03.12T15:00:00.300 24950
2021.03.12T15:00:00.350 29950
2021.03.12T15:00:00.400 34950
2021.03.12T15:00:00.450 39950
2021.03.12T15:00:00.500 44950
2021.03.12T15:00:00.550 25450
2021.03.12T15:00:00.600 5450
2021.03.12T15:00:00.650 9950
2021.03.12T15:00:00.700 14950
2021.03.12T15:00:00.750 19950
2021.03.12T15:00:00.800 24950
2021.03.12T15:00:00.850 29950
2021.03.12T15:00:00.900 34950
2021.03.12T15:00:00.950 39950
2021.03.12T15:00:01.000 44950

结果和订阅不中断一样。

share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades
output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]);
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=append!{Agg1}, msgAsTable=true)

n=500
timev=timestamp(1..n) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

n=500
timev=timestamp(501..1000) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

select * from output1
time sumprice
2021.03.12T15:00:00.050 1225
2021.03.12T15:00:00.100 4950
2021.03.12T15:00:00.150 9950
2021.03.12T15:00:00.200 14950
2021.03.12T15:00:00.250 19950
2021.03.12T15:00:00.300 24950
2021.03.12T15:00:00.350 29950
2021.03.12T15:00:00.400 34950
2021.03.12T15:00:00.450 39950
2021.03.12T15:00:00.500 44950
2021.03.12T15:00:00.550 25450
2021.03.12T15:00:00.600 5450
2021.03.12T15:00:00.650 9950
2021.03.12T15:00:00.700 14950
2021.03.12T15:00:00.750 19950
2021.03.12T15:00:00.800 24950
2021.03.12T15:00:00.850 29950
2021.03.12T15:00:00.900 34950
2021.03.12T15:00:00.950 39950
2021.03.12T15:00:01.000 44950

如果不开启 snapshot,即使从上次中断的地方开始订阅,得到的结果也与订阅不中断不一样。

share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades
output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]);
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=append!{Agg1}, msgAsTable=true)

n=500
timev=timestamp(1..n) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

unsubscribeTable(, "trades", "Agg1")
dropStreamEngine("Agg1")
Agg1=NULL

n=500
timev=timestamp(501..1000) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 500, handler=append!{Agg1}, msgAsTable=true)

select * from output1
time sumprice
2021.03.12T15:00:00.050 1225
2021.03.12T15:00:00.100 4950
2021.03.12T15:00:00.150 9950
2021.03.12T15:00:00.200 14950
2021.03.12T15:00:00.250 19950
2021.03.12T15:00:00.300 24950
2021.03.12T15:00:00.350 29950
2021.03.12T15:00:00.400 34950
2021.03.12T15:00:00.450 39950
2021.03.12T15:00:00.500 44950
2021.03.12T15:00:00.550 1225
2021.03.12T15:00:00.600 4950
2021.03.12T15:00:00.650 9950
2021.03.12T15:00:00.700 14950
2021.03.12T15:00:00.750 19950
2021.03.12T15:00:00.800 24950
2021.03.12T15:00:00.850 29950
2021.03.12T15:00:00.900 34950
2021.03.12T15:00:00.950 39950
2021.03.12T15:00:01.000 44950

应用例子 9

使用 subscribeTable 函数时,可利用 handler 参数过滤订阅的流数据。

在下例中,传感器采集电压和电流数据并实时上传作为流数据源,其中电压 voltage<=122 或电流 current=NULL 的数据需要在进入时序引擎之前过滤掉。

share streamTable(1000:0, `time`voltage`current, [TIMESTAMP, DOUBLE, DOUBLE]) as electricity
outputTable = table(10000:0, `time`avgVoltage`avgCurrent, [TIMESTAMP, DOUBLE, DOUBLE])

//自定义数据处理过程,过滤 voltage<=122 或 current=NULL的无效数据。
def append_after_filtering(inputTable, msg){
	t = select * from msg where voltage>122, isValid(current)
	if(size(t)>0){
		insert into inputTable values(t.time,t.voltage,t.current)		
	}
}
electricityAggregator = createTimeSeriesEngine(name="electricityAggregator", windowSize=6, step=3, metrics=<[avg(voltage), avg(current)]>, dummyTable=electricity, outputTable=outputTable, timeColumn=`time, garbageSize=2000)
subscribeTable(tableName="electricity", actionName="avgElectricity", offset=0, handler=append_after_filtering{electricityAggregator}, msgAsTable=true)

//模拟产生数据
def writeData(t, n){
        timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
        voltage = 120+1..n * 1.0
        current = take([1,NULL,2]*0.1, n)
        insert into t values(timev, voltage, current);
}
writeData(electricity, 10)

流数据表:

select * from electricity
time voltage current
2018.10.08T01:01:01.002 121 0.1
2018.10.08T01:01:01.003 122
2018.10.08T01:01:01.004 123 0.2
2018.10.08T01:01:01.005 124 0.1
2018.10.08T01:01:01.006 125
2018.10.08T01:01:01.007 126 0.2
2018.10.08T01:01:01.008 127 0.1
2018.10.08T01:01:01.009 128
2018.10.08T01:01:01.010 129 0.2
2018.10.08T01:01:01.011 130 0.1

聚合计算结果:

select * from outputTable
time avgVoltage avgCurrent
2018.10.08T01:01:01.006 123.5 0.15
2018.10.08T01:01:01.009 125 0.15

由于 voltage<=122 或 current=NULL 的数据已经在进入时序引擎时被过滤了,所以第一个窗口 [000,003) 里没有数据,也就没有发生计算。

应用例子 10

outputTable 参数除了可以是表之外还可以是其他流数据计算引擎:

share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades

share streamTable(1000:0, `time`sym`open`close`high`low`volume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as kline

outputTable=table(1000:0, `sym`factor1, [SYMBOL, DOUBLE])

Rengine=createReactiveStateEngine(name="reactive", metrics=<[mavg(open, 3)]>, dummyTable=kline, outputTable=outputTable, keyColumn="sym")

Tengine=createTimeSeriesEngine(name="timeseries", windowSize=6000, step=6000, metrics=<[first(price), last(price), max(price), min(price), sum(volume)]>, dummyTable=trades, outputTable=Rengine, timeColumn=`time, useSystemTime=false, keyColumn=`sym)
//时间序列引擎的结果输入响应式状态引擎

subscribeTable(server="", tableName="trades", actionName="timeseries", offset=0, handler=append!{Tengine}, msgAsTable=true)   

运维相关函数

DolphinDB 提供流数据引擎的管理函数,方便查询和管理系统中已经存在的流数据引擎。

  • 获取已定义的流数据引擎清单,可使用函数getStreamEngineStat (deprecated name: getAggregatorStat)

  • 获取流数据引擎的句柄,可使用函数getStreamEngine (deprecated name: getAggregator)

  • 删除流数据引擎,可使用函数dropStreamEngine (deprecated name: dropAggregator)