createAsofJoinEngine
语法
createAsofJoinEngine(name, leftTable, rightTable,
outputTable, metrics, matchingColumn, [timeColumn], [useSystemTime=false],
[delayedTime], [garbageSize], [sortByTime],
[outputElapsedMicroseconds=false])
详情
matchingColumn
+ timeColumn
(或系统时间)
作为连接列,在右表中选取与连接列匹配的,在给定的左表时间戳前且最接近的记录。该引擎适用于两个数据源没有完全匹配的记录,需要按时间段作连接,以获取最新信息的场景。- 如果没有指定
delayedTime
,则当右表数据的最新时间大于左表数据的最新时间,才会触发 join, 计算输出。 - 如果指定了
delayedTime
,则当左表中数据最新时间与上一条未计算的时间差大于 delayedTime 设置值,或者左表中数据在经过 2 * delayedTime (小于2秒按2秒计算)后还没有 join 输出,就直接触发左右两表 join。
更多流数据引擎的应用场景说明可以参考 内置流式计算引擎。
参数
name 字符串标量,表示 asof join 引擎的名称,作为其在一个数据节点/计算节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。
leftTable 和 rightTable 表对象。结构必须与订阅的流数据表相同。从 2.00.11 版本开始左、右表支持 array vector 类型。
createAsofJoinEngine
函数之前,需要将输出表预先设立为一个空表,并指定各列列名以及数据类型。asof join 引擎会将计算结果注入该表。输出表的各列的顺序如下:-
时间列。其中:
- 若 useSystemTime = true,为 TIMESTAMP 类型;
- 若 useSystemTime = false,数据类型与 timeColumn 列一致。
- 连接列。与 matchingColumn 中的列以及其顺序一致,可为多列。
- 计算结果列。可为多列。
- 耗时列。如果指定 outputElapsedMicroseconds = true,则指定一个 LONG 类型的列用于记录单次响应计算耗时(单位:微秒)。
- batchSize 列。如果指定 outputElapsedMicroseconds = true, 则指定一个 INT 类型的列,记录单次响应的数据条数。
metrics 以元代码的格式表示计算指标,支持输入元组。有关元代码的更多信息可参考 Metaprogramming。
- 计算指标可以是一个或多个表达式、系统内置或用户自定义函数、一个常量标量/向量,但不能是聚合函数。当指定为常量向量时,对应的输出列必须设置为数组向量类型,例子参见 createReactiveStateEngine 中的例4。
- metrics 内支持调用具有多个返回值的函数,且必须指定列名,例如 <func(price) as `col1`col2>。若在
metrics 指定了 leftTable 和 rightTable
中具有相同名称的列,默认取左表的列,可以通过 "tableName.colName" 指定该列来自哪个表。注: metrics 中使用的列名大小写不敏感,不需要与输入表的列名大小写保持一致。
matchingColumn 表示连接列的字符串标量/向量/字符串组成的 tuple,支持 Integral, Temporal 或 Literal(UUID 除外)类型。matchingColumn 指定规则为:
- 只有一个连接列:当左表和右表的连接列名相同时,matchingColumn 是一个字符串标量,否则是一个长度为 2 的 tuple,例如:左表连接列名为 sym,右表连接列名为 sym1,则 matchingColumn = [[`sym],[`sym1]]。
- 有多个连接列:当左表和右表的连接列名相同时,matchingColumn 是一个字符串向量,否则是一个长度为 2 的 tuple,例如:左表连接列名为 orderNo, sym,右表连接列名为 orderNo, sym1,则 matchingColumn = [[`orderNo, `sym], [`orderNo,`sym1]]。
timeColumn 可选参数,字符串标量或向量。当 useSystemTime = false 时,指定要连接的两个表中时间列的名称。 leftTable 和 rightTable 时间列名称可以不同,但数据类型需保持一致。当 leftTable 和 rightTable 时间列名称不同时,timeColumn 为一个长度为2的字符串向量。
useSystemTime 可选参数,布尔值。表示是否使用数据注入引擎时的系统时间作为时间列进行计算。
- 当 useSystemTime = true 时,按照数据进入引擎的时刻(毫秒精度的本地系统时间,与数据中的时间列无关)进行计算。
- 当 useSystemTime = false(缺省值)时,按照数据中的时间列进行计算。
delayedTime 可选参数,正整数,单位同 timeColumn 精度一致。表示左表中未联结数据被触发联结并计算输出的最大等待时间。要设置 delayedTime, 必须指定 timeColumn,更多介绍见详情。
garbageSize 可选参数,正整数,默认值是 5,000(单位为行)。随着订阅的流数据不断注入 asof join 引擎,存放在内存中的数据会越来越多,当各分组对应的缓存表(左表或右表)中数据行数超过 garbageSize 值时,系统会清理该表中已经触发计算的历史数据。
outputElapsedMicroseconds 布尔值,表示是否输出数据的单次响应计算耗时(从触发计算的数据注入引擎到计算完成的耗时),默认为 false。指定参数 outputElapsedMicroseconds 后,在定义 outputTable 时需要在计算结果列后增加一个 LONG 类型的列和 INT 类型的列,详见 outputTable 参数说明。
例子
share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share streamTable(1:0, `time`sym`bid`ask, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE]) as quotes
share table(100:0, `time`sym`price`bid`ask`spread, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as prevailingQuotes
ajEngine=createAsofJoinEngine(name="aj1", leftTable=trades, rightTable=quotes, outputTable=prevailingQuotes, metrics=<[price, bid, ask, abs(price-(bid+ask)/2)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false)
tmp1=table(2020.08.27T09:30:00.000+2 8 20 22 23 24 as time, take(`A`B, 6) as sym, 20.01 20.04 20.07 20.08 20.4 20.5 as price)
tmp2=table(2020.08.27T09:30:00.000+1 5 6 11 19 20 21 as time, take(`A`B, 7) as sym, 20 20.02 20.03 20.05 20.06 20.6 20.4 as bid, 20.01 20.03 20.04 20.06 20.07 20.5 20.6 as ask)
tmp1.sortBy!(`time)
tmp2.sortBy!(`time)
subscribeTable(tableName="trades", actionName="joinLeft", offset=0, handler=appendForJoin{ajEngine, true}, msgAsTable=true)
subscribeTable(tableName="quotes", actionName="joinRight", offset=0, handler=appendForJoin{ajEngine, false}, msgAsTable=true)
trades.append!(tmp1)
quotes.append!(tmp2)
sleep(100)
select time, sym, bid from prevailingQuotes
time | sym | bid |
---|---|---|
2020.08.27T09:30:00.002 | A | 20 |
2020.08.27T09:30:00.020 | A | 20.06 |
2020.08.27T09:30:00.008 | B | 20.02 |
// 清理引擎及变量
unsubscribeTable(tableName="trades", actionName="joinLeft")
unsubscribeTable(tableName="quotes", actionName="joinRight")
undef(`trades,SHARED)
undef(`quotes,SHARED)
dropAggregator(name="aj1")
// 定义引擎,设置 sortByTime=true
share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share streamTable(1:0, `time`sym`bid`ask, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE]) as quotes
share table(100:0, `time`sym`price`bid`ask`spread, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as prevailingQuotes
ajEngine=createAsofJoinEngine(name="aj1", leftTable=trades, rightTable=quotes, outputTable=prevailingQuotes, metrics=<[price, bid, ask, abs(price-(bid+ask)/2)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false, sortByTime=true)
tmp1=table(2020.08.27T09:30:00.000+2 8 20 22 23 24 as time, take(`A`B, 6) as sym, 20.01 20.04 20.07 20.08 20.4 20.5 as price)
tmp2=table(2020.08.27T09:30:00.000+1 5 6 11 19 20 21 as time, take(`A`B, 7) as sym, 20 20.02 20.03 20.05 20.06 20.6 20.4 as bid, 20.01 20.03 20.04 20.06 20.07 20.5 20.6 as ask)
tmp1.sortBy!(`time)
tmp2.sortBy!(`time)
// 只能使用 appendForJoin 插入数据
subscribeTable(tableName="trades", actionName="joinLeft", offset=0, handler=appendForJoin{ajEngine, true}, msgAsTable=true)
subscribeTable(tableName="quotes", actionName="joinRight", offset=0, handler=appendForJoin{ajEngine, false}, msgAsTable=true)
trades.append!(tmp1)
quotes.append!(tmp2)
sleep(100)
// 查看结果表,数据按照全局时间顺序输出
time | sym | bid |
---|---|---|
2020.08.27T09:30:00.002 | A | 20 |
2020.08.27T09:30:00.008 | B | 20.02 |
2020.08.27T09:30:00.020 | A | 20.06 |