createLookupJoinEngine
语法
createLookupJoinEngine(name, leftTable, rightTable,
outputTable, metrics, matchingColumn, [rightTimeColumn], [checkTimes],
[outputElapsedMicroseconds=false], [keepDuplicates=false])
详情
创建流数据表的 lookup join 引擎,该引擎以 matchingColumn 作为连接列将两个流数据表进行实时 leftjoin, 或者将流数据表和非流数据表进行 left join(此时需要定时刷新非流数据表)。lookup join 引擎常用于右表更新不频繁的场景(如保存了日频指标的维度表)。
工作机制
- 仅当左表有新数据流入时才会触发左、右表的 left join 输出。
- 当 keepDuplicates=false 时,引擎仅保留右表根据 matchingColumn 分组后各分组内的最新一条数据。当 keepDuplicates=true 时,引擎保留右表根据 matchingColumn 分组后各分组内的所有数据。当右表是订阅的流数据表时,数据流入右表的同时会更新数据; 若右表为内存表、维度表或元代码,系统会根据 checkTimes 定时刷新右表数据。
lookup join 引擎与 asof join 引擎很相似,它们之间的区别如下:
- lookup join 引擎输出表的第一列可以不是时间列,而 asof join 引擎输出表第一列必须是时间列。
- lookup join 引擎当左表有新数据流入便会触发 join 输出,因此无需考虑数据延迟,也无需缓存左表数据。而 asof join 引擎,当指定 timeColumn 时,需要考虑左右表的数据延时。
更多流数据引擎的应用场景说明可以参考 内置流式计算引擎。
参数
name 必选参数,表示流数据 lookup join 引擎的名称,作为其在一个数据节点/计算节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。
leftTable 表对象。可以不包含数据,但结构必须与订阅的流数据表相同。
rightTable 表对象,可以是内存表、流数据表、维度表或 SQL 查询元代码(查询语句必须返回表 )。请注意,如果 rightTable 没有被订阅,但 rightTable 会定期更新,则必须设置 checkTimes 来定时刷新右表数据。
outputTable 必选参数,为计算结果的输出表。在使用 createLookupJoinEngine
函数之前,需要将输出表预先设立为一个空表,并指定各列列名以及数据类型。
输出表的各列的顺序如下:
- 连接列。与 matchingColumn 中的列以及其顺序一致,可为多列。
- 计算结果列。可为多列。
- 耗时列。如果指定 outputElapsedMicroseconds = true,则指定一个 LONG 类型的列用于记录单次响应计算耗时(单位:微秒)。
- batchSize 列。如果指定 outputElapsedMicroseconds = true, 则指定一个INT类型的列,记录单次响应的数据条数。
metrics 以元代码的格式表示计算指标,支持输入元组。有关元代码的更多信息可参考 Metaprogramming。
- 计算指标可以是一个或多个表达式、系统内置或用户自定义函数、一个常量标量/向量,但不能是聚合函数。当指定为常量向量时,对应的输出列必须设置为数组向量类型,例子参见 createReactiveStateEngine 中的例4。
- metrics 内支持调用具有多个返回值的函数,且必须指定列名,例如 <func(price) as `col1`col2>。
- 若在 metrics 指定了 leftTable 和 rightTable
中具有相同名称的列,默认取左表的列,可以通过 "tableName.colName" 指定该列来自哪个表。 注: metrics 中使用的列名大小写不敏感,不需要与输入表的列名大小写保持一致。
matchingColumn 表示连接列的字符串标量/向量或字符串向量组成的 tuple,支持 Integral, Temporal 或 Literal(UUID 除外)类型。matchingColumn 指定规则:
- 只有一个连接列。当左表和右表的连接列名相同时,matchingColumn 是一个字符串标量,否则是一个长度为 2 的 tuple。例如:左表连接列名为 sym,右表连接列名为 sym1,则 matchingColumn = [[`sym],[`sym1]]。
- 有多个连接列。当左表和右表的连接列名相同时,matchingColumn 是一个字符串向量,否则是一个长度为 2 的 tuple。例如:左表连接列名为 timestamp, sym,右表连接列名为 timestamp, sym1,则 matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]]。
rightTimeColumn 是字符串标量,表示右表的时间列名称。若设置该参数,右表会根据指定的时间列的时间戳保留最新的数据(若有多行,则取其中最后一行)。若不指定该参数,则根据数据注入系统的时间保留最新数据。
checkTimes 是一个时间类型向量或 DURATION 的标量。设置后,系统会定时更新 rightTable 的数据(只保留 rightTable 的最新数据),并将更新后的数据追加到引擎中。当无需更新 rightTable 时,则不用设置该参数,但需要在引擎创建后,手动将 rightTable 注入到引擎中。
- checkTimes 是时间类型向量时,只能为SECOND、TIME 或 NANOTIME 类型。 lookup join 引擎每天根据向量内各元素指定的时间定时更新右表。
- checkTimes 是 DURATION 标量时,表示更新右表的时间间隔。
outputElapsedMicroseconds 布尔值,表示是否输出单次响应计算的耗时(从触发计算的数据注入引擎到计算完成的耗时),默认为 false。指定参数 outputElapsedMicroseconds 后,在定义 outputTable 时需要在计算结果列后增加一个 LONG 类型的列和 INT 类型的列,详见 outputTable 参数说明。
keepDuplicates 布尔值,表示是否保留右表各分组内的所有数据。默认值为 false,即在关联时只取右表各分组内的最新一条数据。当设置为 true 时,在关联时则使用右表各分组内的所有数据。
例子
例1.
login(`admin, `123456)
share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
share table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE]) as output
LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym)
subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)
n = 15
tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
prices.append!(tem1)
sleep(2000)
n = 10
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
trades.append!(tem2)
sleep(100)
select * from output
sym | factor1 | factor2 | factor3 |
---|---|---|---|
A | 10.1 | 13 | 131.3 |
B | 11.1 | 14 | 155.4 |
C | 12.1 | 15 | 181.5 |
A | 13.1 | 13 | 170.3 |
B | 14.1 | 14 | 197.4 |
C | 15.1 | 15 | 226.5 |
A | 16.1 | 13 | 209.3 |
B | 17.1 | 14 | 239.4 |
C | 8.1 | 15 | 271.5 |
A | 19.1 | 13 | 248.3 |
例2.
share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
share table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE]) as output
LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps)
subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)
n = 15
tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
prices.append!(tem1)
sleep(2000)
n = 10
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
trades.append!(tem2)
sleep(100)
select * from output
sym | factor1 | factor2 | factor3 |
---|---|---|---|
A | 10.1 | 10 | 101 |
B | 11.1 | 11 | 122.1 |
C | 12.1 | 12 | 145.2 |
A | 13.1 | 10 | 131 |
B | 14.1 | 11 | 155.1 |
C | 15.1 | 12 | 181.2 |
A | 16.1 | 10 | 161 |
B | 17.1 | 11 | 188.1 |
C | 18.1 | 12 | 217.2 |
A | 19.1 | 10 | 191 |
例3. 右表是内存表,需设置 checkTimes
share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE]) as output
share table(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps, checkTimes=1s)
subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
n = 15
tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
prices.append!(tem1)
sleep(2000)
n = 10
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
trades.append!(tem2)
sleep(100)
select * from output
sym | factor1 | factor2 | factor3 |
---|---|---|---|
A | 10.1 | 10 | 101 |
B | 11.1 | 11 | 122.1 |
C | 12.1 | 12 | 145.2 |
A | 13.1 | 10 | 131 |
B | 14.1 | 11 | 155.1 |
C | 15.1 | 12 | 181.2 |
A | 16.1 | 10 | 161 |
B | 17.1 | 11 | 188.1 |
C | 18.1 | 12 | 217.2 |
A | 19.1 | 10 | 191 |
例4. 左表为一个实时的交易表与右表(相对稳定的维度表)做连接。
share streamTable(1000:0, `time`volume`id, [TIMESTAMP, INT,INT]) as trades
dbPath="dfs://testlj"
if(existsDatabase(dbPath)){
dropDatabase(dbPath)
}
rt=table(1000:0, `time`price`id, [TIMESTAMP, DOUBLE, INT])
db=database(dbPath, VALUE, `A`B`C)
prices=db.createTable(rt,`rightTable)
share table(10000:0, `id`volume`price`prod, [INT,INT,DOUBLE,DOUBLE]) as outputTable
tradesLookupJoin = createLookupJoinEngine(name="streamLookup1", leftTable=trades, rightTable=prices, outputTable=outputTable, metrics=<[volume,price,volume*price]>, matchingColumn=`id, rightTimeColumn=`time,checkTimes=1s)
subscribeTable(tableName="trades", actionName="append_trades", offset=0, handler=appendForJoin{tradesLookupJoin, true}, msgAsTable=true)
def writeData(t,n){
timev = 2021.10.08T01:01:01.001 + timestamp(1..n)
volumev = take(1..n, n)
id = take(1 2 3, n)
insert into t values(timev, volumev, id)
}
writeData(rt, 10)
prices.append!(rt)
sleep(2000)
writeData(trades, 6)
sleep(2)
select * from outputTable
id | volume | price | prod |
---|---|---|---|
1 | 1 | 10 | 10 |
2 | 2 | 8 | 16 |
3 | 3 | 9 | 27 |
1 | 4 | 10 | 40 |
2 | 5 | 8 | 40 |
3 | 6 | 9 | 54 |
例5. 通过 rightTable 可以对分布式分区表中的字段进行关联,此时 rightTable 是一个 SQL 查询元代码:
share streamTable(1000:0, `time`volume`id, [TIMESTAMP, INT,INT]) as trades
dbPath="dfs://lookupjoinDB"
if(existsDatabase(dbPath)){
dropDatabase(dbPath)
}
rt=table(1000:0, `time`price`id, [TIMESTAMP, DOUBLE, INT])
db=database(dbPath, HASH, [INT,5])
prices=db.createPartitionedTable(rt,`rightTable, `id)
share table(10000:0, `id`volume`price`prod, [INT,INT,DOUBLE,DOUBLE]) as outputTable
tradesLookupJoin = createLookupJoinEngine(name="streamLookup1", leftTable=trades, rightTable=<select * from loadTable(dbPath, `rightTable)>, outputTable=outputTable, metrics=<[volume,price,volume*price]>, matchingColumn=`id, rightTimeColumn=`time,checkTimes=1s)
subscribeTable(tableName="trades", actionName="append_trades", offset=0, handler=appendForJoin{tradesLookupJoin, true}, msgAsTable=true)
def writeData(t,n){
timev = 2021.10.08T01:01:01.001 + timestamp(1..n)
volumev = take(1..n, n)
id = take(1 2 3, n)
insert into t values(timev, volumev, id)
}
writeData(rt, 10)
prices.append!(rt)
sleep(2000)
writeData(trades, 6)
sleep(2)
select * from outputTable
id | volume | price | prod |
---|---|---|---|
1 | 1 | 10 | 10 |
2 | 2 | 8 | 16 |
3 | 3 | 9 | 27 |
1 | 4 | 10 | 40 |
2 | 5 | 8 | 40 |
3 | 6 | 9 | 54 |