snapshot join 引擎
snapshot join 引擎的连接机制可以看作一个双向的 lookup join 引擎,当左右两表的数据注入引擎时都能触发关联。在默认情况下,引擎采用内连接方式,当左表(右表)中的每一条记录,成功匹配上右表(左表)中的记录时,引擎将输出最新一条结果。用户可以通过设置参数,决定引擎采用内连接还是外连接方式、关联全部记录或仅关联最新一条记录。
与 lookup join 引擎不同的是,snapshot join 引擎的左右表只能是数据流而不能是数据集。与 equi join 引擎相比,snapshot join 引擎可以一直保留引擎中缓存的记录,不会进行过期清理。
下图展示默认设置下,字段结构为(连接列,时间列,指标)的输入数据注入 snapshot join 引擎的效果。
snapshot join 引擎由 createSnapshotJoinEngine 函数创建,其语法如下:
createSnapshotJoinEngine(name, leftTable, rightTable, outputTable, metrics,
matchingColumn, [timeColumn], [outputElapsedMicroseconds=false],
[keepLeftDuplicates=false], [keepRightDuplicates=false], [isInnerJoin=true],
[snapshotDir], [snapshotIntervalInMsgCount])
其参数的详细含义可以参考:createSnapshotJoinEngine。
应用例子 1- 账户交易数据与行情快照数据关联计算账户持仓盈亏
在计算账户持仓盈亏的场景中,账户交易数据和行情快照数据往往都是从实时更新的数据源获取,通过关联两个表格获取资产的最新市场价格与账户持仓信息,计算持仓盈亏的监控指标。
这个场景的特征是,当账户交易数据更新一条资产的交易数据时,需要立即从行情快照数据中关联相应资产市场价格的最新数据;而行情快照数据更新一条资产的快照数据时,需要立即从账户交易数据中关联账户中对相应资产的最新交易数据。以下脚本用 snapshot join 引擎来实现此场景。
// 创建流表
colNames = `SecurityID`Time`LastPrice
colTypes = [SYMBOL, TIMESTAMP, DOUBLE]
share streamTable(1:0, colNames, colTypes) as snapshot
colNames = `ACCT_ID`ORDER_NO`TRADE_TIME`SecurityID`Net`LONG_AVG_PRICE`SHORT_AVG_PRICE
colTypes = `SYMBOL`STRING`TIMESTAMP`SYMBOL`INT`DOUBLE`DOUBLE
share streamTable(1:0, colNames, colTypes) as trades
output=table(100:0, ["SecurityID", "TRADE_TIME", "Time", "ACCT_ID", "ORDER_NO", "PNL"],
[SYMBOL, TIMESTAMP, TIMESTAMP, SYMBOL, STRING, DOUBLE])
// 创建引擎
metrics = [<ACCT_ID>, <ORDER_NO>, <round(Net*(LastPrice-iif((Net>0), LONG_AVG_PRICE, SHORT_AVG_PRICE))) as PNL>]
snapshot_engine = createSnapshotJoinEngine(name = "SJE", leftTable=trades, rightTable=snapshot, outputTable=output, metrics=metrics,
matchingColumn = `SecurityID, timeColumn = `TRADE_TIME`Time, isInnerJoin=true, keepLeftDuplicates=false,
keepRightDuplicates=false)
// 订阅流表
subscribeTable(tableName="trades", actionName="joinLeft", offset=0, handler=appendForJoin{snapshot_engine, true}, msgAsTable=true)
subscribeTable(tableName="snapshot", actionName="joinRight", offset=0, handler=appendForJoin{snapshot_engine, false}, msgAsTable=true)
流表 snapshot 用于存放行情快照数据。流表 trades 用于存放账户交易数据,其中 Net表示账户对资产的持仓净数量,LONG_AVG_PRICE表示资产的多头成交均价,SHORT_AVG_PRICE 表示资产的空头成交均价。
账户交易数据 trades 注入引擎的左表,行情快照数据 snapshot 注入引擎的右表。snapshot join 引擎中 isInnerJoin 设置为 true,则两表进行内连接。参数 keepLeftDuplicates 和 keepRightDuplicates 设置为 false,能保证 trades(snapshot)数据注入引擎时只关联 snapshot(trades)中的最新数据。
在本例中,资产持仓盈亏指标的计算规则为:资产持仓净数量为正时,用资产持仓净数量乘以资产最新价与资产多头成交均价的差值;资产持仓净数量为负时,用资产持仓净数量乘以资产最新价与资产空头成交均价的差值。
通过以下代码往 trades 和 snapshot 中插入数据样例,模拟账户交易数据和行情快照数据实时更新。
insert into trades values(`a, `1, 2024.10.10T10:00:02.784,`111111, 100, 19.03, 17.71)
sleep(10)
insert into snapshot values(`111111, 2024.10.10T10:00:03.000, 18.79)
insert into snapshot values(`222222, 2024.10.10T10:00:03.000, 5.54)
sleep(10)
insert into trades values(`a, `2, 2024.10.10T10:00:04.447,`222222, 300, 5.43, 11.63)
sleep(10)
insert into snapshot values(`111111, 2024.10.10T10:00:06.000, 17.71)
insert into snapshot values(`222222, 2024.10.10T10:00:06.000, 14.99)
sleep(10)
insert into trades values(`a, `3, 2024.10.10T10:00:06.637,`111111, -200, 13.2, 7.47)
sleep(10)
insert into trades values(`a, `4, 2024.10.10T10:00:08.380,`222222, 200, 15.62, 13.19)
sleep(10)
insert into snapshot values(`111111, 2024.10.10T10:00:09.000, 19.81)
insert into snapshot values(`222222, 2024.10.10T10:00:09.000, 13.49)
sleep(10)
insert into trades values(`a, `5, 2024.10.10T10:00:10.680,`111111, -100, 11.09, 3.69)
关联结果 output 如下:
应用例子 2
将参数 isInnerJoin 设置为 false,引擎将以外连接的方式关联左右表。
share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as leftTable
share streamTable(1:0, `time`sym`val, [TIMESTAMP, SYMBOL, INT]) as rightTable
output=table(100:0, `sym`time1`time2`price`val`total, [SYMBOL, TIMESTAMP, TIMESTAMP, DOUBLE, INT, DOUBLE])
engine=createSnapshotJoinEngine(name = "engine1", leftTable=leftTable,
rightTable=rightTable, outputTable=output,
metrics=[<price>, <val>, <price*val>], matchingColumn=`sym,
timeColumn=`time, isInnerJoin=false)
subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{engine, true}, msgAsTable=true)
subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{engine, false}, msgAsTable=true)
n = 6
tem1 = table( (2018.10.08T01:01:01.001 + 1..n) as time,take(`A`B`C, n) as sym,take(1..4,n) as val)
rightTable.append!(tem1)
n = 5
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as time,take(`A`B`C, n) as sym,take(0.1+10..13,n) as price)
leftTable.append!(tem2)
| sym | time1 | time2 | price | val | total |
|---|---|---|---|---|---|
| A | 2018.10.08 01:01:01.002 | 1 | |||
| B | 2018.10.08 01:01:01.003 | 2 | |||
| C | 2018.10.08 01:01:01.004 | 3 | |||
| A | 2018.10.08 01:01:01.005 | 4 | |||
| B | 2018.10.08 01:01:01.006 | 1 | |||
| C | 2018.10.08 01:01:01.007 | 2 | |||
| A | 2019.10.08 01:01:01.002 | 2018.10.08 01:01:01.005 | 10.1 | 4 | 40.4 |
| B | 2019.10.08 01:01:01.003 | 2018.10.08 01:01:01.006 | 11.1 | 1 | 11.1 |
| C | 2019.10.08 01:01:01.004 | 2018.10.08 01:01:01.007 | 12.1 | 2 | 24.2 |
| A | 2019.10.08 01:01:01.005 | 2018.10.08 01:01:01.005 | 13.1 | 4 | 52.4 |
| B | 2019.10.08 01:01:01.006 | 2018.10.08 01:01:01.006 | 10.1 | 1 | 10.1 |
使用以下代码清理环境:
dropStreamEngine("engine1")
unsubscribeTable(tableName="leftTable", actionName="joinLeft")
unsubscribeTable(tableName="rightTable", actionName="joinRight")
undef(`leftTable, SHARED)
undef(`rightTable, SHARED)
参数keepLeftDuplicates 和 keepRightDuplicates 设置为 true,左表(右表)更新数据时将关联右表(左表)全部匹配的记录。
share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as leftTable
share streamTable(1:0, `time`sym`val, [TIMESTAMP, SYMBOL, INT]) as rightTable
output=table(100:0, `sym`time1`time2`price`val`total, [SYMBOL, TIMESTAMP, TIMESTAMP, DOUBLE, INT, DOUBLE])
engine=createSnapshotJoinEngine(name = "engine1", leftTable=leftTable,
rightTable=rightTable, outputTable=output,
metrics=[<price>, <val>, <price*val>], matchingColumn=`sym,
timeColumn=`time, keepLeftDuplicates=true,
keepRightDuplicates=true)
subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{engine, true}, msgAsTable=true)
subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{engine, false}, msgAsTable=true)
n = 6
tem1 = table( (2018.10.08T01:01:01.001 + 1..n) as time,take(`A`B`C, n) as sym,take(1..4,n) as val)
rightTable.append!(tem1)
n = 5
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as time,take(`A`B`C, n) as sym,take(0.1+10..13,n) as price)
leftTable.append!(tem2)
| sym | time1 | time2 | price | val | total |
|---|---|---|---|---|---|
| A | 2019.10.08 01:01:01.002 | 2018.10.08 01:01:01.002 | 10.1 | 1 | 10.1 |
| A | 2019.10.08 01:01:01.002 | 2018.10.08 01:01:01.005 | 10.1 | 4 | 40.4 |
| B | 2019.10.08 01:01:01.003 | 2018.10.08 01:01:01.003 | 11.1 | 2 | 22.2 |
| B | 2019.10.08 01:01:01.003 | 2018.10.08 01:01:01.006 | 11.1 | 1 | 11.1 |
| C | 2019.10.08 01:01:01.004 | 2018.10.08 01:01:01.004 | 12.1 | 3 | 36.3 |
| C | 2019.10.08 01:01:01.004 | 2018.10.08 01:01:01.007 | 12.1 | 2 | 24.2 |
| A | 2019.10.08 01:01:01.005 | 2018.10.08 01:01:01.002 | 13.1 | 1 | 13.1 |
| A | 2019.10.08 01:01:01.005 | 2018.10.08 01:01:01.005 | 13.1 | 4 | 52.4 |
| B | 2019.10.08 01:01:01.006 | 2018.10.08 01:01:01.003 | 10.1 | 2 | 20.2 |
| B | 2019.10.08 01:01:01.006 | 2018.10.08 01:01:01.006 | 10.1 | 1 | 10.1 |
指定 outputElapsedMicroseconds 为 true,在结果表 output 记录单次响应计算耗时和单次响应的数据条数,此时 output 需要在最后指定耗时列和 batchSize 列。
dropStreamEngine("engine1")
unsubscribeTable(tableName="leftTable", actionName="joinLeft")
unsubscribeTable(tableName="rightTable", actionName="joinRight")
undef(`leftTable, SHARED)
undef(`rightTable, SHARED)
share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as leftTable
share streamTable(1:0, `time`sym`val, [TIMESTAMP, SYMBOL, INT]) as rightTable
output=table(100:0, `sym`time1`time2`price`val`total`execTime`batchSize, [SYMBOL, TIMESTAMP, TIMESTAMP, DOUBLE, INT, DOUBLE, LONG, INT])
engine=createSnapshotJoinEngine(name = "engine1", leftTable=leftTable,
rightTable=rightTable, outputTable=output,
metrics=[<price>, <val>, <price*val>], matchingColumn=`sym,
timeColumn=`time, outputElapsedMicroseconds=true)
subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{engine, true}, msgAsTable=true)
subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{engine, false}, msgAsTable=true)
n = 6
tem1 = table( (2018.10.08T01:01:01.001 + 1..n) as time,take(`A`B`C, n) as sym,take(1..4,n) as val)
rightTable.append!(tem1)
n = 5
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as time,take(`A`B`C, n) as sym,take(0.1+10..13,n) as price)
leftTable.append!(tem2)
| sym | time1 | time2 | price | val | total | execTime | batchSize |
|---|---|---|---|---|---|---|---|
| A | 2019.10.08 01:01:01.002 | 2018.10.08 01:01:01.005 | 10.1 | 4 | 40.4 | 112 | 5 |
| B | 2019.10.08 01:01:01.003 | 2018.10.08 01:01:01.006 | 11.1 | 1 | 11.1 | 112 | 5 |
| C | 2019.10.08 01:01:01.004 | 2018.10.08 01:01:01.007 | 12.1 | 2 | 24.2 | 112 | 5 |
| A | 2019.10.08 01:01:01.005 | 2018.10.08 01:01:01.005 | 13.1 | 4 | 52.4 | 112 | 5 |
| B | 2019.10.08 01:01:01.006 | 2018.10.08 01:01:01.006 | 10.1 | 1 | 10.1 | 112 | 5 |
相关信息
