createTimeSeriesEngine
语法
createTimeSeriesEngine(name, windowSize, step, metrics, dummyTable,
outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize],
[updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir],
[snapshotIntervalInMsgCount], [fill='none'], [forceTriggerTime], [raftGroup],
[keyPurgeFreqInSec=-1], [closed='left'], [outputElapsedMicroseconds=false],
[subWindow], [parallelism=1], [acceptedDelay=0])
别名:createTimeSeriesAggregator
详情
创建流数据时间序列引擎,以实现基于时间的滑动窗口或滚动窗口进行实时计算。返回一个表对象,通过向该表对象写入,将数据注入时间序列引擎进行计算。
时序引擎的聚合计算的算子分为增量计算和全量计算两种。增量计算算子不会保留历史数据,每次有数据进来就会进行增量计算;而全量计算算子(例如自定义的聚合函数,或未经优化的内置聚合函数,亦或是嵌套了状态函数的函数)会保留窗口内完整的数据,待触发输出时进行全量计算。
时序引擎对以下聚合计算算子进行了优化,实现了增量计算,显著提升了性能:corr, covar, first, last, max, med, min, percentile, quantile, std, var, sum, sum2, sum3, sum4, wavg, wsum, count, firstNot, ifirstNot, lastNot, ilastNot, imax, imin, nunique, prod, sem, mode, searchK, beta, avg。
更多流数据引擎的应用场景说明可以参考 内置流式计算引擎。
窗口
-
窗口边界规整:对起始窗口左边界进行规整。(详情请参考 step 和 roundTime 的参数说明以及
规整规则表
) -
窗口确定:
-
windowSize 确定窗口长度。
-
closed 确定窗口左闭右开或左开右闭。
-
step 确定滑动的步长。
其中,windowSize 与 step 的单位取决于 useSystemTime 参数。若 useSystemTime = true, windowSize 与 step 的单位是毫秒。若 useSystemTime = false,windowSize 与 step 的单位同 timeColumn 列的精度一致。
-
-
窗口截取:useSystemTime 参数决定了如何截取窗口,可以基于数据的时间列,亦可以基于数据注入系统的时间进行截取。
计算
-
计算前提:若指定了 timeColumn,则 timeColumn 的时间必须递增;若同时指定了 keyColumn,则按照分组分别进行滑动窗口计算,timeColumn 在每组内的时间必须递增。否则,时间乱序的数据在计算中会被直接丢弃。
-
触发规则:若指定了 timeColumn,当前窗口的计算将由该窗口结束后收到的第一条数据触发;若设置 useSystemTime = true,窗口结束后会自动触发计算。
-
强制触发规则:对长时间未触发计算的窗口数据,可以通过设置 updateTime 或 forceTriggerTime 强制触发计算。具体规则请参考参数说明。
-
窗口填充规则:未指定 fill 或指定 fill = "none" 时,只会输出计算结果不为空的窗口;若指定了 fill,则会输出所有窗口,且根据 fill 规则对结果为空的窗口进行填充。
其它功能
-
支持数据/状态清理:清理引擎中堆积的数据和不再需要的状态信息。(详情请参考 garbageSize 和 keyPurgeFreqInSec 的参数说明)
-
快照机制:启用快照机制之后,系统若出现异常,可及时将流数据引擎恢复到最新的快照状态。(详情请参考 snapshotDir 和 snapshotIntervalInMsgCount 的参数说明)
-
流数据引擎高可用:若要启用引擎高可用,需在订阅端 raft 组的 leader 节点创建引擎并通过 raftGroup 参数开启高可用。开启高可用后,当 leader 节点宕机时,会自动切换新 leader 节点重新订阅流数据表。(详情请参考 raftGroup 的参数说明)
参数
name 字符串标量,表示时间序列引擎的名称,作为其在一个数据节点/计算节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。
windowSize 正整数标量或向量,表示滑动窗口的长度。
-
若 useSystemTime 为 true,则 windowSize 和 step 的单位为毫秒。
-
若 useSystemTime 为 false,则 windowSize 和 step 的单位与 timeColumn 的时间精度一致。
-
计算指标可以是一个或多个系统内置或用户自定义的聚合函数(使用 defg 关键字定义),如 <[sum(volume), avg(price)]>;可以对聚合结果使用表达式,如 <[avg(price1)-avg(price2)]>;也可对列与列的计算结果进行聚合计算,如 <[std(price1-price2)]>。
-
metrics 内支持调用具有多个返回值的函数,例如 <func(price) as `col1`col2>(可不指定列名)。
-
若 windowSize 为向量, windowSize 每个值可对应 metrics 中多个计算指标。例如,windowSize 为[10,20]时,metrics可为 (<[min(volume), max(volume)]>, <sum(volume)>)。 metrics 也可以嵌套输入元组向量。例如:[[<[min(volume), max(volume)]>, <sum(volume)>], [<avg(volume)>]]
注:- metrics 中使用的列名大小写不敏感,不需要与输入表的列名大小写保持一致。
- metrics 中不可使用嵌套聚合函数。
createTimeSeriesEngine
函数之前,需要将输出表预先设立为一个空表,并指定各列列名以及数据类型。时间序列引擎会将计算结果插入该表。-
输出表的列顺序如下:
-
时间列。其中:
-
若 useSystemTime = true,为 TIMESTAMP 类型;反之,该列数据类型与 timeColumn 列一致。
-
若 useWindowStartTime = true,显示时间为数据窗口起始时间;反之,显示时间为数据窗口终止时间。
-
-
分组列。如果 keyColumn 不为空,则其后几列和 keyColumn 设置的列及其顺序保持一致。
-
耗时列。如果指定 outputElapsedMicroseconds = true,则指定一个 LONG 类型的列用于存储耗时(单位:微秒)。
-
计算结果列。可为多列。
注: 自 2.00.10 版本开始,引擎支持通过自定义聚合函数,将多个计算结果以 array vector 的形式输出,此时必须在 outputTable 中指定对应列类型为 array vector,详见例2。 -
-
当 useSystemTime = true时,时间序列引擎会按照数据注入时间序列引擎的时刻(毫秒精度的本地系统时间,与数据中的时间列无关),每隔固定时间截取固定长度窗口的流数据进行计算。只要一个数据窗口中含有数据,数据窗口结束后就会自动进行计算。结果中的第一列为计算发生的时间戳,与数据中的时间无关。
-
当 useSystemTime = false(缺省值)时,时间序列引擎根据流数据中的 timeColumn 列来截取数据窗口。一个数据窗口结束后的第一条新数据才会触发该数据窗口的计算。请注意,触发计算的数据并不会参与该次计算。
例如,一个数据窗口从 10:10:10 到 10:10:19。若 useSystemTime = true,则只要该窗口中至少有一条数据,该窗口的计算会在窗口结束后的 10:10:20 触发。若 useSystemTime = false,且 10:10:19 后的第一条数据为 10:10:25,则该窗口的计算会在 10:10:25 触发。
updateTime 可选参数,非负整数,单位与 timeColumn 的时间精度一致。用于指定比 step 更短的计算时间间隔。step 必须是 updateTime 的整数倍。要设置 updateTime, useSystemTime 必须设为 false。
如果没有指定 updateTime,一个数据窗口结束前,不会发生对该数据窗口数据的计算。若一个窗口长时间未触发计算,可以指定 updateTime,分多次触发当前窗口数据的计算。计算触发的规则为:
-
updateTime 指定为正整数值时:
-
从当前窗口的左边界开始,每隔 updateTime 时间,若有新的数据到来,则对当前窗口内该数据之前的所有数据进行计算。
-
如果系统经过 2 * updateTime (至少2秒)后仍有未被处理的数据,则触发对当前窗口内所有数据的计算。
-
若分组计算,则每组内进行上述操作。
-
-
updateTime 指定为 0 时:在新的数据到来后,立即对当前窗口的最新数据计算并输出。
指定 updateTime 后,建议使用键值内存表作为输出表。因为每次计算均会增加一条记录,输出表若使用普通内存表或流数据表,则会产生大量带有相同时间戳的结果。因为键值流数据表不可更新记录,输出表亦不推荐使用键值流数据表。
useWindowStartTime 可选参数,布尔值,表示输出表中的时间是否为数据窗口起始时间。默认值为 false,表示输出表中的时间为数据窗口起始时间 + windowSize 。若 windowSize 是向量, useWindowStartTime 必须为 false。
roundTime 可选参数,布尔值,表示若数据时间精度为毫秒或者秒且 step > 一分钟,如何对窗口边界值进行规整处理。默认值为 true,表示按照既定的多分钟规则进行规整。若为 false,则按一分钟规则进行窗口规整。
若要开启快照机制(snapshot),必须指定 snapshotDir 与 snapshotIntervalInMsgCount。
-
指定的目录必须存在,否则系统会提示异常。
-
创建流数据引擎时,如果指定了 snapshotDir,会检查该目录下是否存在快照。如果存在,会加载该快照,恢复引擎的状态。
-
多个引擎可以指定同一个目录存储快照,用引擎的名称来区分快照文件。
-
一个引擎的快照可能会使用三个文件名:
-
临时存储快照信息:文件名为 <engineName>.tmp;
-
快照生成并刷到磁盘:文件保存为 <engineName>.snapshot;
-
存在同名快照:旧快照自动重命名为 <engineName>.old。
-
snapshotIntervalInMsgCount 可选参数,为整数类型,表示每隔多少条数据保存一次流数据引擎快照。
fill 可选参数,一个标量或向量,指定某个分组的某个窗口无数据时的处理方法。可取以下值:
- 'none': 不输出结果。
- 'null': 输出结果为 NULL。
- 'ffill': 输出上一个有数据的窗口的结果。
- '具体数值':该值的数据类型需要和对应的 metrics 计算结果的类型保持一致。
fill 可以输入向量,长度与 metrics 元素个数保持一致,表示为每个 metrics 指定不同的 fill 方式。若为向量,向量中各项只能是 'null', 'ffill' 或一个数值,不能是 'none'。
-
未被触发计算的窗口结束后(窗口结束时刻为 t),若收到了其他分组的数据(时间戳为 t1),且满足 t1-t ≥ forceTriggerTime,则该窗口将被触发计算。
-
如果某个分组在最后一个窗口被强制触发计算后,没有再收到新的数据,但其他分组仍然收到了新的数据,那么通过 fill 来填充该分组的所有缺失窗口,可以确保在最新时间截面上仍然输出该分组的窗口。如果不指定 fill,则最后一个窗口被触发计算后,该分组不会产生新的窗口。
-
设置 updateTime,计算发生后仍有属于当前窗口的数据到达时,当前窗口计算结果会再次更新;
-
设置 forceTriggerTime,则触发计算之后收到的数据会被丢弃,建议不要设置太小的 forceTriggerTime。
raftGroup 可选参数,表示流数据高可用订阅端 raft 组的 ID (大于1的整数,由流数据高可用相关的配置项 streamingRaftGroups 指定)。设置该参数表示开启计算引擎高可用。在 leader 节点创建流数据引擎后,会同步在 follower 节点创建该引擎。每次保存的 snapshot 也会同步到 follower。当 raft 组的 leader 节点宕机时,会自动切换新 leader 节点重新订阅流数据表。请注意,若要指定 raftGroup,必须同时指定 snapshotDir。
若需要查看清理前后的状态,可以通过调用 getStreamEngineStat 函数查看 TimeSeriesEngine 引擎状态的 numGroups 列,来对比响应式状态引擎清理前后分组数的变化。
-
closed = 'left': 窗口左闭右开。
-
closed = 'right': 窗口左开右闭。
outputElapsedMicroseconds 布尔值,表示是否输出每个窗口从触发计算到计算完成输出结果的耗时(若指定了 keyColumn 则包含数据分组的耗时),默认为 false。指定参数 outputElapsedMicroseconds 后,在定义 outputTable 时需要在时间列和分组列后增加一个 LONG 类型的列,详见 outputTable 参数说明。
subWindow 整型或者 DURATION 数据对。在滑动窗口内指定子窗口,仅计算子窗口内的数据。子窗口边界的开闭情况由参数 closed 决定。子窗口结束后收到第一条数据触发对子窗口内数据的计算(参考例4)。当 subWindow 为整型数据对时,其单位与 timeColumn 的时间精度一致。若指定 subWindow,则:
-
windowSize 和 step 必须相等。
-
不可设置 updateTime>0和 useSystemTime=true。
parallelism 为不超过 63 的正整数,可选参数,表示并行计算的工作线程数,默认值为 1。在计算量较大时,合理地调整该参数能够有效利用计算资源,降低计算耗时。建议小于机器核数,推荐值为 4 到 8 。
acceptedDelay 为小于 windowSize 的正整数,可选参数。当 useSystemTime= true 时,指定每个窗口接收数据的最大延迟,默认值为 0。窗口结束后 acceptedDelay 时间内接收到的数据,仍然属于此窗口并参与计算,而不会参与下一个窗口的计算。
规整规则
为了便于观察和对比计算结果,系统会对第一个数据窗口的起始时间进行规整,根据 step 参数、数据的时间精度,以及 roundTime 参数来确定整数类型的规整尺度 alignmentSize。当时间序列引擎使用分组计算时,所有分组的窗口均进行统一的规整。相同时刻的数据窗口在各组均有相同的边界。
-
若数据的时间精度为分钟,如 MINUTE(HH:mm) 类型,alignmentSize 取值如下:
若 roundTime = false:
step | alignmentSize |
---|---|
0~2 | 2 |
3 | 3 |
4~5 | 5 |
6~10 | 10 |
11~15 | 15 |
16~20 | 20 |
21~30 | 30 |
>30 | 60 (1小时) |
若 roundTime = true:
当 step <= 30 时,alignmentSize 取值同上表。当 step > 30 时,alignmentSize 取值见下表:
step | alignmentSize |
---|---|
31~60 | 60 (1小时) |
61~120 | 120 (2小时) |
121~180 | 180 (3小时) |
181~300 | 300 (5小时) |
301~600 | 600 (10小时) |
601~900 | 900 (15小时) |
901~1200 | 1200 (20小时) |
1201~1800 | 1800 (30小时) |
>1800 | 3600 (60小时) |
-
若数据的时间精度为秒,如 DATETIME(yyyy-MM-dd HH:mm:ss) 与 SECOND(HH:mm:ss) 类型,alignmentSize 的 取值如下:
若 roundTime = false:
step | alignmentSize |
---|---|
0~2 | 2 |
3 | 3 |
4~5 | 5 |
6~10 | 10 |
11~15 | 15 |
16~20 | 20 |
21~30 | 30 |
>30 | 60 (1分钟) |
若 roundTime = true:
当 step <= 30 时,alignmentSize 取值同上表。当 step > 30 时,alignmentSize 取值见下表:
step | alignmentSize |
---|---|
31~60 | 60 (1分钟) |
61~120 | 120 (2分钟) |
121~180 | 180 (3分钟) |
181~300 | 300 (5分钟) |
301~600 | 600 (10分钟) |
601~900 | 900 (15分钟) |
901~1200 | 1200 (20分钟) |
1201~1800 | 1800 (30分钟) |
>1800 | 3600 (1小时) |
-
若数据的时间精度为毫秒,如 TIMESTAMP(yyyy-MM-dd HH:mm:ss.mmm) 与 TIME(HH:mm:ss.mmm) 类型,alignmentSize 的取值如下:
若 roundTime = false:
step | alignmentSize |
---|---|
0~2 | 2 |
3~5 | 5 |
6~10 | 10 |
11~20 | 20 |
21~25 | 25 |
26~50 | 50 |
51~100 | 100 |
101~200 | 200 |
201~250 | 250 |
251~500 | 500 |
501~1000 | 1000(1秒) |
1001~2000 | 2000(2秒) |
2001~5000 | 5000(5秒) |
5001~10000 | 10000(10秒) |
10001~15000 | 15000(15秒) |
15001~20000 | 20000(20秒) |
20001~30000 | 30000(30秒) |
>30000 | 60000(1分钟) |
若 roundTime = true:
若 step <= 30000,alignmentSize 取值同上表;若 step > 30000,alignmentSize 取值见下表:
step | alignmentSize |
---|---|
30001~60000 | 60000(1分钟) |
60001~120000 | 120000(2分钟) |
120001~300000 | 300000(5分钟) |
300001~600000 | 600000(10分钟) |
600001~900000 | 900000(15分钟) |
900001~1200000 | 1200000(20分钟) |
1200001~1800000 | 1800000(30分钟) |
>1800000 | 3600000(1小时) |
-
若数据的时间精度为纳秒,如 NANOTIMESTAMP(yyyy-MM-dd HH:mm:ss.nnnnnnnnn) 与 NANOTIME(HH:mm:ss.nnnnnnnnn) 类型,alignmentSize 的取值如下:
若 roundTime = false:
step | alignmentSize |
---|---|
0~2ns | 2ns |
3ns~5ns | 5ns |
6ns~10ns | 10ns |
11ns~20ns | 20ns |
21ns~25ns | 25ns |
26ns~50ns | 50ns |
51ns~100ns | 100ns |
101ns~200ns | 200ns |
201ns~250ns | 250ns |
251ns~500ns | 500ns |
>500ns | 1000ns |
若 roundTime = true:
step | alignmentSize |
---|---|
1000ns~1ms | 1ms |
1ms~10ms | 10ms |
10ms~100ms | 100ms |
100ms~1s | 1s |
1s~2s | 2s |
2s~3s | 3s |
3s~5s | 5s |
5s~10s | 10s |
10s~15s | 15s |
15s~20s | 20s |
20s~30s | 30s |
>30s | 1min |
假设第一条数据的时间为 x,
那么根据其类型,第一个数据窗口的左边界的计算规则为:timeType_cast(x/alignmentSize*alignmentSize+step-windowSize)
。其中,timeType_cast
表示依据时间精度,需要强制转换的时间类型;'/' 表示整除。例如,第一条数据的时间为 2018.10.08T01:01:01.365,windowSize 为
120000,step 为 60000,那么 alignmentSize 为 60000,第一个数据窗口的左边界为
timestamp(2018.10.08T01:01:01.365/60000*60000+60000-120000)
,即
2018.10.08T01:00:00.000。
例子
- getStreamingStat:用于查看当前的流数据表订阅者。例如:
getStreamingStat().pubTables
- unsubscribeTable:用于取消订阅流数据表。例如,在例子 1 代码运行后取消对
trades
的订阅:
unsubscribeTable(tableName="trades", actionName="engine1")
- dropStreamTable:用于删除流数据表。例如,删除例子 1 中的流数据表
trades:
dropStreamTable(tableName="trades")
- dropStreamEngine:用于流数据引擎。例如,删除例子 1 中的流数据引擎
engine1:
dropStreamEngine("engine1")
例1. 时间序列引擎 engine1 订阅流数据表 trades,实时计算表 trades 中过去1分钟内每只股票交易量之和。
share streamTable(1000:0, ["time","sym","volume"], [TIMESTAMP, SYMBOL, INT]) as trades
share table(10000:0, ["time","sym","sumVolume"], [TIMESTAMP, SYMBOL, INT]) as output1
engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn="time", useSystemTime=false, keyColumn="sym", garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);
insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23)
sleep(10)
select * from output1;
time | sym | sumVolume |
---|---|---|
2018.10.08T01:02:00.000 | A | 38 |
2018.10.08T01:02:00.000 | B | 40 |
2018.10.08T01:03:00.000 | A | 25 |
2018.10.08T01:03:00.000 | B | 9 |
下面详细解释时间序列引擎的计算过程。为简便起见,以下提到时间时,省略相同的日期部分,只列出(小时:分钟:秒.毫秒)部分。
首先,时间序列引擎对第一个数据窗口的起始时间进行规整。第一个数据窗口的时间范围是 01:01:00.000 到
01:02:00.000,只包含左边界,不包含右边界。当(01:02:10.789,A,15)到达时,触发第一个窗口 A
组计算;当(01:02:12.005,
B,9)到达时,触发第一个窗口 B 组计算。
第二个数据窗口的时间范围是 01:02:00.000 到
01:03:00.000。当(01:04:02.236,A,29)到达时,触发第二个窗口 A
组计算;当(01:04:04.412,
B,32)到达时,触发第二个窗口 B 组计算。
由于 01:05:00.000 及之后没有数据,因此没有对 01:04:00.000 到 01:05:00.000 之间的数据进行计算。
输出表 output1 保存了时间序列引擎的计算结果。由于 useWindowStartTime 为 false,因此输出表 output1 中的时间为窗口的结束时间。若将 useWindowStartTime 设为 true,则输出表中的时间为窗口的起始时间。例如:
output2 = table(10000:0, ["time","sym","sumVolume"], [TIMESTAMP, SYMBOL, INT])
engine2 = createTimeSeriesEngine(name="engine2", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output2, timeColumn="time", useSystemTime=false, keyColumn="sym", garbageSize=50, useWindowStartTime=true)
subscribeTable(tableName="trades", actionName="engine2", offset=0, handler=append!{engine2}, msgAsTable=true)
sleep(10)
select * from output2;
time | sym | sumVolume |
---|---|---|
2018.10.08T01:01:00.000 | A | 38 |
2018.10.08T01:01:00.000 | B | 40 |
2018.10.08T01:02:00.000 | A | 25 |
2018.10.08T01:02:00.000 | B | 9 |
下例中指定 updateTime 为 1000(毫秒):
share keyedTable(["time","sym"],10000:0, ["time","sym","sumVolume"], [TIMESTAMP, SYMBOL, INT]) as output3
engine3 = createTimeSeriesEngine(name="engine3", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output3, timeColumn="time", useSystemTime=false, keyColumn="sym", garbageSize=50, updateTime=1000, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="engine3", offset=0, handler=append!{engine3}, msgAsTable=true)
sleep(2001)
select * from output3;
time | sym | sumVolume |
---|---|---|
2018.10.08T01:02:00.000 | A | 38 |
2018.10.08T01:02:00.000 | B | 40 |
2018.10.08T01:03:00.000 | A | 25 |
2018.10.08T01:03:00.000 | B | 9 |
2018.10.08T01:05:00.000 | B | 55 |
2018.10.08T01:05:00.000 | A | 29 |
例2. 下例中指定 updateTime 为 0(毫秒):
share streamTable(1000:0, ["time","sym","volume"], [TIMESTAMP, SYMBOL, INT]) as trades
share table(1000:0, ["time","sym","sumVolume"], [TIMESTAMP, SYMBOL, INT]) as output4
engine4 = createTimeSeriesEngine(name="engine4", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output4, timeColumn="time", useSystemTime=false, keyColumn="sym", garbageSize=50, updateTime=0, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="engine4", offset=0, handler=append!{engine4}, msgAsTable=true);
insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
sleep(100)
insert into trades values(2018.10.08T01:01:12.557,`B,21)
insert into trades values(2018.10.08T01:01:12.557,`A,28)
select * from output4;
updateTime=0 时,每收到一批数据就触发一次计算输出:
-
01:01:12.457 时,触发了第一批数据中 A、B 两组数据的计算输出。
-
01:01:12.557 时,再次触发第二批数据中 A、B 两组数据的计算输出。
最终得到:
time | sym | sumVolume | |
---|---|---|---|
0 | 2018.10.08 01:02:00.000 | A | 38 |
1 | 2018.10.08 01:02:00.000 | B | 40 |
2 | 2018.10.08 01:02:00.000 | B | 61 |
3 | 2018.10.08 01:02:00.000 | A | 66 |
下面以最后一个数据窗口为例,介绍时间序列引擎指定 updateTime 时如何进行计算。假设 time 列时间亦为数据进入时间序列引擎的时刻。
-
在 01:04:04.236 时,A 分组的第一条记录到达后已经过 2000 毫秒,触发一次 A 组计算,输出表增加一条记录(01:05:00.000, `A, 29)。
-
在 01:04:05.152 时的 B 组记录为 01:04:04.412 所在小窗口[01:04:04.000, 01:04:05.000)之后第一条记录,触发一次 B 组计算,输出表增加一条记录(01:05:00,"B",32)。
-
2 秒(2*updateTime个时间单位)后,在 01:04:07.152 时,由于 01:04:05.152 时的 B 组记录仍未参与计算,触发一次B组计算,输出一条记录(01:05:00,"B",55)。由于输出表的主键为 time 和 sym,并且输出表中已有(01:05:00,"B",32)这条记录,因此将该记录更新为(01:05:00,"B",55)。
下例中,共享流数据表 "pubT" 包含两个时间列,类型分别时 DATE 和 SECOND,创建时间序列引擎时,通过设置 timeColumn 来将原来流数据表的两个时间列整合为输出表 streamMinuteBar_1min 中的一个类型为 DATETIME 的时间列。
colNames=["symbol","date","minute","price","type","volume"]
colTypes=[SYMBOL, DATE, SECOND, DOUBLE, STRING, INT]
pubTable = streamTable(10000:0,colNames,colTypes)
share pubTable as pubT
colNames = ["time","symbol","open","max","min","close","volume","amount","ret","vwap"]
colTypes = [DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE]
share streamTable(10000:0,colNames, colTypes) as streamMinuteBar_1min
tsAggrOHLC = createTimeSeriesEngine(name="subT", windowSize=60, step=60, metrics=<[first(price) as open ,max(price) as max,min(price) as min ,last(price) as close ,sum(volume) as volume ,wsum(volume, price) as amount ,(last(price)-first(price))/first(price)) as ret, (wsum(volume, price)/sum(volume)) as vwap]>, dummyTable=pubTable, outputTable=streamMinuteBar_1min, timeColumn=["date","minute"], useSystemTime=false, keyColumn="symbol", fill="none")
subscribeTable(tableName="pubT", actionName="subT", offset=-1, handler=append!{tsAggrOHLC}, msgAsTable=true)
insert into pubT values("000001", 2021.04.05, 09:25:01, 1, 'B', 1)
insert into pubT values("000001", 2021.04.05, 09:30:05, 2, 'B', 1)
insert into pubT values("000001", 2021.04.05, 09:31:06, 3, 'B', 1)
insert into pubT values("000001", 2021.04.05, 09:35:05, 4, 'S', 4)
insert into pubT values("000001", 2021.04.05, 09:40:05, 5, 'S', 5)
insert into pubT values("000001", 2021.04.06, 09:25:05, 6, 'S', 6)
pubT
symbol | date | minute | price | type | volume |
---|---|---|---|---|---|
000001 | 2021.04.05 | 09:25:01 | 1 | B | 1 |
000001 | 2021.04.05 | 09:30:05 | 2 | B | 1 |
000001 | 2021.04.05 | 09:31:06 | 3 | B | 1 |
000001 | 2021.04.05 | 09:35:05 | 4 | S | 4 |
000001 | 2021.04.05 | 09:40:05 | 5 | S | 5 |
000001 | 2021.04.06 | 09:25:05 | 6 | S | 6 |
select * from streamMinuteBar_1min
time | symbol | open | max | min | close | volume | amount | ret | vwap |
---|---|---|---|---|---|---|---|---|---|
2021.04.05T09:26:00 | 000001 | 1 | 1 | 1 | 1 | 1 | 1 | 0 | 1 |
2021.04.05T09:31:00 | 000001 | 2 | 2 | 2 | 2 | 1 | 2 | 1 | 2 |
2021.04.05T09:32:00 | 000001 | 3 | 3 | 3 | 3 | 1 | 3 | 2 | 3 |
2021.04.05T09:36:00 | 000001 | 4 | 4 | 4 | 4 | 4 | 16 | 3 | 4 |
2021.04.05T09:41:00 | 000001 | 5 | 5 | 5 | 5 | 5 | 25 | 4 | 5 |
share streamTable(1000:0, ["time","sym","qty"], [DATETIME, SYMBOL, INT]) as trades
share table(10000:0, ["time","sym","sumQty"], [DATETIME, SYMBOL, INT]) as output5
engine = createTimeSeriesEngine(name="engine", windowSize=6, step=6, metrics=<sum(qty)>, dummyTable=trades, outputTable=output5, timeColumn="time",keyColumn="sym", forceTriggerTime=7,fill=1000)
subscribeTable(tableName="trades", actionName="engine", offset=0, handler=append!{engine}, msgAsTable=true)
sleep(1000)
insert into engine values(2018.08.01T14:05:43,`A,1)
insert into engine values(2018.08.01T14:05:43,`C,3)
sleep(10)
insert into engine values(2018.08.01T14:05:44,`B,1)
sleep(80)
insert into engine values(2018.08.01T14:05:52,`B,3)
sleep(20)
insert into engine values(2018.08.01T14:05:54,`A,3)
sleep(10)
insert into engine values(2018.08.01T14:05:55,`A,5)
sleep(20)
insert into engine values(2018.08.01T14:05:57,`B,5)
sleep(50)
insert into engine values(2018.08.01T14:06:12,`A,1)
sleep(50)
select * from output5 order by sym
time | sum | Qty |
---|---|---|
2018.08.01T14:05:46 | A | 1 |
2018.08.01T14:05:52 | A | 1,000 |
2018.08.01T14:05:58 | A | 8 |
2018.08.01T14:06:04 | A | 1,000 |
2018.08.01T14:06:10 | A | 1,000 |
2018.08.01T14:05:46 | B | 1 |
2018.08.01T14:05:52 | B | 1,000 |
2018.08.01T14:05:58 | B | 8 |
2018.08.01T14:05:46 | C | 3 |
2018.08.01T14:05:52 | C | 1,000 |
例3. 下例计算每个分组窗口的第一个和最后一个 volume,组合后输出为一个数组向量。
//通过 defg 自定义一个函数,引擎在调用该函数时会将返回值转换为数组向量
defg toVector(x){
return x
}
share streamTable(1000:0, ["time","sym","volume","price"], [TIMESTAMP, SYMBOL, DOUBLE,DOUBLE]) as trades
//定义输出表,对应 toVector 指标输出的列需要定义为数组向量类型,此例中将其定义为 DOUBLE[]
share table(10000:0, ["time","sym","sumVolume","avg"], [TIMESTAMP,STRING,DOUBLE[],DOUBLE]) as output6
//metrics 中调用了 toVector 函数,将1分钟窗口内的第一个 volume 和最后一个 volume 组合到一个向量中。
engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[toVector([first(volume),last(volume)]),avg(volume+price)]>, dummyTable=trades, outputTable=output6, timeColumn="time", keyColumn="sym" , useSystemTime=false, garbageSize=50, useWindowStartTime=false)
times = sort(2023.10.08T00:00:00.000 + rand(1..(1+3000*200), 30))
syms = rand("A"+string(1..10), 30)
volumes = rand(rand(100.0, 10) join 2.3 NULL NULL NULL NULL, 30)
prices = rand(rand(100.0, 10) join 2.3 NULL NULL NULL NULL, 30)
t=table(times as time, syms as sym, volumes as volume,prices as price)
engine1.append!(t)
select * from output6 where time between 2023.10.08T00:01:00.000 and 2023.10.08T00:05:00.000 order by time,sym
time | sym | sumVolume | avg |
---|---|---|---|
2023.10.08T00:01:00.000 | A1 | [57.66,] | 80.428 |
2023.10.08T00:01:00.000 | A2 | [,] | |
2023.10.08T00:02:00.000 | A7 | [,] | |
2023.10.08T00:02:00.000 | A8 | [,] | |
2023.10.08T00:03:00.000 | A2 | [,41.25] | 101.7832 |
2023.10.08T00:03:00.000 | A3 | [41.25,41.25] | 68.4897 |
2023.10.08T00:03:00.000 | A6 | [,] | |
2023.10.08T00:03:00.000 | A8 | [19.17,19.17] | 93.9934 |
2023.10.08T00:04:00.000 | A7 | [,] | |
2023.10.08T00:05:00.000 | A3 | [2.05,2.05] | 95.433 |
2023.10.08T00:05:00.000 | A5 | [55.93,55.93] | 116.4669 |
2023.10.08T00:05:00.000 | A9 | [42.66,42.66] |
例4. 本例将说明 subWindow 参数的作用。
share streamTable(1000:0, ["time","sym","volume"], [TIMESTAMP, SYMBOL, INT]) as trades share table(10000:0, ["time","sym","sumVolume"], [TIMESTAMP, SYMBOL, INT]) as output7 //在长度为 1 分钟的窗口内指定一个子窗口,未指定 closed 时,子窗口左闭右开。这里子窗口为每分钟的[0s, 10s) engine4 = createTimeSeriesEngine(name="engine4", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output7, timeColumn="time", useSystemTime=false, keyColumn="sym", garbageSize=50, useWindowStartTime=true, subWindow=0s:10s) subscribeTable(tableName="trades", actionName="engine4", offset=0, handler=append!{engine4}, msgAsTable=true); insert into trades values(2018.10.08T01:01:01.785,`A,10) insert into trades values(2018.10.08T01:01:02.125,`B,26) insert into trades values(2018.10.08T01:01:10.000,`A,14) insert into trades values(2018.10.08T01:01:12.457,`A,28) sleep(10) select * from output7;
2018.10.08T01:01:10.000 时刻的数据触发 [2018.10.08T01:01:00.000, 2018.10.08T01:01:10.000) 区间内 A 组数据对应的 volume 的总和。在子窗口结束后没有再收到 B 组的数据,因此 B 组子窗口内数据
time | sym | sumVolume |
---|---|---|
2018.10.08T01:01:10.000 |
A | 10 |