createSnapshotJoinEngine

语法

createSnapshotJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [timeColumn], [outputElapsedMicroseconds=false], [keepLeftDuplicates=false], [keepRightDuplicates=false], [isInnerJoin=true])

详情

创建流数据表的快照连接引擎。该引擎以matchingColumn作为连接列,将两个流数据表进行实时的内连接(等值连接)或外连接(左/右连接)。

返回值:返回一个左、右两表连接后的表对象。

连接行为

isInnerJoin 参数决定连接方式:

  • isInnerJoin=true 时,进行内连接,即每次数据注入引擎左表时,会在右表中查找与连接列字段相匹配的记录,匹配成功才会将两表匹配的记录进行连接,并计算 metrics 后输出。每次数据注入引擎右表时,亦会做同样操作。

  • isInnerJoin=false 时,进行外连接,即每次数据注入引擎左表时,与右表进行左连接。无论在右表中是否找到匹配的记录,都会计算 metrics 并输出(右表未匹配的记录输出空值)。每次数据注入引擎右表时,亦会做同样操作。

  • 当 keepLeftDuplicates=false 时,仅与左表中根据 matchingColumn 分组后的最新一条数据进行匹配连接。当 keepLeftDuplicates=true 时,与左表所有数据进行匹配连接。

  • 当 keepRightDuplicates=false 时,仅与右表中根据 matchingColumn 分组后的最新一条数据进行匹配连接。当 keepRightDuplicates=true 时,与右表所有数据进行匹配连接。

Snapshot Join 引擎与 Lookup Join、Equi Join 引擎很相似,主要区别如下:

  • Lookup join 引擎由左表的新记录触发连接;而 Snapshot Join 引擎可以由左表/右表的新记录触发连接。

  • Equi Join 引擎仅匹配最新记录,且没有缓存;而 Snapshot Join 引擎可以选择与右表/左表所有记录或最新记录进行匹配,且有缓存。

参数

name:字符串标量,表示 Snapshot join 引擎的名称,作为其在一个数据节点/计算节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。

leftTable:表对象,表示左表,其结构必须与订阅的流数据表相同。

rightTable:表对象,表示右表,其结构必须与订阅的流数据表相同。

outputTable:计算结果的输出表。在使用 createSnapshotJoinEngine 函数之前,需要将输出表预先设立为一个空表,并指定各列列名以及数据类型。

输出表的各列的顺序如下:

  1. 连接列。与 matchingColumn 中的列以及其顺序一致,可为多列。

  2. 时间列。两列,分别为左表和右表的时间列。如果指定了 timeColumn,则数据类型与 timeColumn 列一致;否则,数据类型为 TIMESTAMP。

  3. 计算结果列。可为多列。

  4. 耗时列。如果指定 outputElapsedMicroseconds = true,则指定一个 LONG 类型的列用于记录单次响应计算耗时(单位:微秒)。

  5. batchSize 列。如果指定 outputElapsedMicroseconds = true, 则指定一个INT类型的列,记录单次响应的数据条数。

metrics:以元代码的格式表示计算指标,支持输入元组。有关元代码的更多信息可参考 Metaprogramming

  • 计算指标可以是一个或多个表达式、系统内置或用户自定义函数,但不能是聚合函数。

  • metrics 内支持调用具有多个返回值的函数,且必须指定列名,例如 <func(price) as `col1`col2>。

  • 若在 metrics 指定了 leftTablerightTable 中具有相同名称的列,默认取左表的列,可以通过 "tableName.colName" 指定该列来自哪个表。

    注: metrics 中使用的列名大小写不敏感,不需要与输入表的列名大小写保持一致。

matchingColumn:表示连接列的字符串标量/向量或字符串向量组成的 tuple,支持 Integral, Temporal 或 Literal(UUID 除外)类型。matchingColumn 指定规则:

  1. 只有一个连接列。当左表和右表的连接列名相同时,matchingColumn 是一个字符串标量,否则是一个长度为 2 的 tuple。例如:左表连接列名为 sym,右表连接列名为 sym1,则 matchingColumn = [[`sym],[`sym1]]。

  2. 有多个连接列。当左表和右表的连接列名相同时,matchingColumn 是一个字符串向量,否则是一个长度为 2 的 tuple。例如:左表连接列名为 timestamp, sym,右表连接列名为 timestamp, sym1,则 matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]]。

timeColumn: 字符串标量或向量,可选参数。用于指定 leftTable 和 rightTable 中时间列的名称。两表的时间列名称可以不同,但数据类型需保持一致。当 leftTable 和 rightTable 时间列名称相同时,timeColumn 是字符串标量,否则,timeColumn 是长度为2的字符串向量。

outputElapsedMicroseconds:布尔值,可选参数,表示是否输出单次响应计算的耗时(从触发计算的数据注入引擎到计算完成的耗时),默认为 false。指定参数 outputElapsedMicroseconds 后,在定义 outputTable 时需要在计算结果列后增加一个 LONG 类型的列和 INT 类型的列,详见 outputTable 参数说明。

keepLeftDuplicates:布尔值,可选参数,表示在进行右连接时,是否匹配左表各分组内的所有数据。

  • 若设置为 false(默认值):仅匹配左表各分组内的最新一条数据。

  • 若设置为 true:匹配左表各分组内的所有数据。

keepRightDuplicates:布尔值,可选参数,表示在进行左连接时,是否保留右表各分组内的所有数据。

  • 若设置为 false(默认值):仅匹配右表各分组内的最新一条数据。

  • 若设置为 true:匹配右表各分组内的所有数据。

isInnerJoin:布尔值,可选参数,表示是否进行内连接。

  • 若设置为 true(默认值):进行内连接,只有左右两表匹配的记录被计算输出。

  • 若设置为 false:进行左连接或右连接,保留两个表中的所有记录。对于左右两表中匹配的记录进行计算,而未匹配的记录输出空值。

例子

例1. 不指定 outputElapsedMicroseconds,指定 isInnerJoin=true,同时指定 keepLeftDuplicates=true,keepRightDuplicates=true。此时,引擎进行内连接,且匹配左/右表各分组内的所有数据。

//定义输入、输出表
share streamTable(1:0, `timestamp`sym1`id`price`val, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE]) as leftTable
share streamTable(1:0, `timestamp`sym2`id`price`qty, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE]) as rightTable
output=table(100:0, ["id","sym", "timestamp1", "timestamp2", "factor1", "factor2"], 
[INT, SYMBOL, TIMESTAMP, TIMESTAMP, DOUBLE, DOUBLE])

test_metrics = [<val*10>, <qty>]
// 创建引擎
test_engine = createSnapshotJoinEngine(name = "test_SJE", leftTable=leftTable, rightTable=rightTable, 
outputTable=output, metrics=test_metrics, matchingColumn = [["id","sym1"],["id","sym2"]], 
timeColumn = `timestamp, isInnerJoin=true, keepLeftDuplicates=true,keepRightDuplicates=true)

//将左表数据注入引擎
timestamp = 2024.10.10T15:12:01.507+1..10
sym = take(["a","b","c","d"],10)
id = [1,1,2,1,5,2,4,4,1,4]
price = [2.53,7.61,8.07,7.87,7.29,9.39,5.98,9.49,9.20,9.17]
val = [101,108,101,109,104,100,108,100,107,104]
left_data = table(timestamp as timestamp,sym as sym1,id as id,price as price,val as val)
appendForJoin(test_engine,true, left_data)

//将右表数据注入引擎,此时会触发两表进行右连接
timestamp = 2024.10.10T15:12:01.507+1..10
sym = take(["a","b","c","d"],10)
id = [1,2,4,3,5,5,4,2,5,5]
price =  [1.08,9.08,9.97,7.60,1.91,6.77,7.81,8.81,0.61,5.92]
qty =  [208,200,203,202,204,201,206,207,205,205]
right_data = table(timestamp as timestamp,sym as sym2,id as id,price as price,qty as qty)
appendForJoin(test_engine,false, right_data)

select * from output

结果表中将保留左表所有匹配记录。

id

sym

timestamp1

timestamp2

factor1

factor2

1 a 2024.10.10T15:12:01.508 2024.10.10T15:12:01.508 1,010 208
1 a 2024.10.10T15:12:01.516 2024.10.10T15:12:01.508 1,070 208
2 b 2024.10.10T15:12:01.513 2024.10.10T15:12:01.509 1,000 200
4 c 2024.10.10T15:12:01.514 2024.10.10T15:12:01.510 1,080 203
5 a 2024.10.10T15:12:01.512 2024.10.10T15:12:01.512 1,040 204
4 c 2024.10.10T15:12:01.514 2024.10.10T15:12:01.514 1,080 206
5 a 2024.10.10T15:12:01.512 2024.10.10T15:12:01.516 1,040 205

例2. 不指定 outputElapsedMicroseconds,指定 isInnerJoin=true,keepLeftDuplicates=false,keepRightDuplicates=true。此时,引擎进行内连接,且匹配左表各分组内的最新一条数据。

//首先取消上例中定义的引擎
dropStreamEngine("test_SJE")

//定义输入、输出表
share streamTable(1:0, `timestamp`sym1`id`price`val, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE]) as leftTable
share streamTable(1:0, `timestamp`sym2`id`price`qty, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE]) as rightTable
output=table(100:0, ["id","sym", "timestamp1", "timestamp2", "factor1", "factor2"], 
[INT, SYMBOL, TIMESTAMP, TIMESTAMP, DOUBLE, DOUBLE])

test_metrics = [<val*10>, <qty>]
// 创建引擎
test_engine = createSnapshotJoinEngine(name = "test_SJE", leftTable=leftTable, rightTable=rightTable, 
outputTable=output, metrics=test_metrics, matchingColumn = [["id","sym1"],["id","sym2"]], 
timeColumn = `timestamp, isInnerJoin=true, keepLeftDuplicates=false,keepRightDuplicates=true)

//将左表数据注入引擎
timestamp = 2024.10.10T15:12:01.507+1..10
sym = take(["a","b","c","d"],10)
id = [1,1,2,1,5,2,4,4,1,4]
price = [2.53,7.61,8.07,7.87,7.29,9.39,5.98,9.49,9.20,9.17]
val = [101,108,101,109,104,100,108,100,107,104]
left_data = table(timestamp as timestamp,sym as sym1,id as id,price as price,val as val)
appendForJoin(test_engine,true, left_data)

//将右表数据注入引擎,此时会触发两表进行右连接
timestamp = 2024.10.10T15:12:01.507+1..10
sym = take(["a","b","c","d"],10)
id = [1,2,4,3,5,5,4,2,5,5]
price =  [1.08,9.08,9.97,7.60,1.91,6.77,7.81,8.81,0.61,5.92]
qty =  [208,200,203,202,204,201,206,207,205,205]
right_data = table(timestamp as timestamp,sym as sym2,id as id,price as price,qty as qty)
appendForJoin(test_engine,false, right_data)

select * from output

结果表中仅保留左表每个分组中的最新记录,因此比例1中的结果少了一行记录。

id

sym

timestamp1

timestamp2

factor1

factor2

1 a 2024.10.10T15:12:01.516 2024.10.10T15:12:01.508 1,070 208
2 b 2024.10.10T15:12:01.513 2024.10.10T15:12:01.509 1,000 200
4 c 2024.10.10T15:12:01.514 2024.10.10T15:12:01.510 1,080 203
5 a 2024.10.10T15:12:01.512 2024.10.10T15:12:01.512 1,040 204
4 c 2024.10.10T15:12:01.514 2024.10.10T15:12:01.514 1,080 206
5 a 2024.10.10T15:12:01.512 2024.10.10T15:12:01.516 1,040 205

例3. 不指定 outputElapsedMicroseconds,指定 isInnerJoin=false,keepLeftDuplicates=false,keepRightDuplicates=true。此时,引擎进行左连接或右连接。在左连接时,匹配右表各分组内的所有数据;在进行右连接时,仅匹配左表各分组内的最新一条数据。

//首先取消上例中定义的引擎
dropStreamEngine("test_SJE")

//定义输入、输出表
share streamTable(1:0, `timestamp`sym1`id`price`val, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE]) as leftTable
share streamTable(1:0, `timestamp`sym2`id`price`qty, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE]) as rightTable
output=table(100:0, ["id","sym", "timestamp1", "timestamp2", "factor1", "factor2"], 
[INT, SYMBOL, TIMESTAMP, TIMESTAMP, DOUBLE, DOUBLE])

test_metrics = [<val*10>, <qty>]
// 创建引擎
test_engine = createSnapshotJoinEngine(name = "test_SJE", leftTable=leftTable, rightTable=rightTable, 
outputTable=output, metrics=test_metrics, matchingColumn = [["id","sym1"],["id","sym2"]], 
timeColumn = `timestamp, isInnerJoin=false, keepLeftDuplicates=false,keepRightDuplicates=true)

//将左表数据注入引擎
timestamp = 2024.10.10T15:12:01.507+1..10
sym = take(["a","b","c","d"],10)
id = [1,1,2,1,5,2,4,4,1,4]
price = [2.53,7.61,8.07,7.87,7.29,9.39,5.98,9.49,9.20,9.17]
val = [101,108,101,109,104,100,108,100,107,104]
left_data = table(timestamp as timestamp,sym as sym1,id as id,price as price,val as val)
appendForJoin(test_engine,true, left_data)

//将右表数据注入引擎,此时会触发两表进行右连接
timestamp = 2024.10.10T15:12:01.507+1..10
sym = take(["a","b","c","d"],10)
id = [1,2,4,3,5,5,4,2,5,5]
price =  [1.08,9.08,9.97,7.60,1.91,6.77,7.81,8.81,0.61,5.92]
qty =  [208,200,203,202,204,201,206,207,205,205]
right_data = table(timestamp as timestamp,sym as sym2,id as id,price as price,qty as qty)
appendForJoin(test_engine,false, right_data)

select * from output

由于引擎进行了右连接,仅保留左表每个分组中的最新记录。

id

sym

timestamp1

timestamp2

factor1

factor2

1 a 2024.10.10T15:12:01.508 1,010
1 b 2024.10.10T15:12:01.509 1,080
2 c 2024.10.10T15:12:01.510 1,010
1 d 2024.10.10T15:12:01.511 1,090
5 a 2024.10.10T15:12:01.512 1,040
2 b 2024.10.10T15:12:01.513 1,000
4 c 2024.10.10T15:12:01.514 1,080
4 d 2024.10.10T15:12:01.515 1,000
1 a 2024.10.10T15:12:01.516 1,070
4 b 2024.10.10T15:12:01.517 1,040
1 a 2024.10.10T15:12:01.516 2024.10.10T15:12:01.508 1,070 208
2 b 2024.10.10T15:12:01.513 2024.10.10T15:12:01.509 1,000 200
4 c 2024.10.10T15:12:01.514 2024.10.10T15:12:01.510 1,080 203
3 d 2024.10.10T15:12:01.511 202
5 a 2024.10.10T15:12:01.512 2024.10.10T15:12:01.512 1,040 204
5 b 2024.10.10T15:12:01.513 201
4 c 2024.10.10T15:12:01.514 2024.10.10T15:12:01.514 1,080 206
2 d 2024.10.10T15:12:01.515 207
5 a 2024.10.10T15:12:01.512 2024.10.10T15:12:01.516 1,040 205
5 b 2024.10.10T15:12:01.517 205

例4. 指定 outputElapsedMicroseconds = true,输出耗时列和 batchSize 列。

//首先取消上例中定义的引擎
dropStreamEngine("test_SJE")

//定义输入、输出表
share streamTable(1:0, `timestamp`sym1`id`price`val, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE]) as leftTable
share streamTable(1:0, `timestamp`sym2`id`price`qty, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE]) as rightTable
output=table(100:0, ["id","sym", "timestamp1", "timestamp2", "factor1", "factor2", "timecost","batchsize"],
[INT, SYMBOL, TIMESTAMP, TIMESTAMP, DOUBLE, DOUBLE, LONG, INT])

test_metrics = [<val*10>, <qty>]
// 创建引擎
test_engine = createSnapshotJoinEngine(name = "test_SJE", leftTable=leftTable, rightTable=rightTable, 
outputTable=output, metrics=test_metrics, matchingColumn = [["id","sym1"],["id","sym2"]],
timeColumn = `timestamp, outputElapsedMicroseconds=true, isInnerJoin=true,
keepLeftDuplicates=false,keepRightDuplicates=true)

//将左表数据注入引擎
timestamp = 2024.10.10T15:12:01.507+1..10
sym = take(["a","b","c","d"],10)
id = [1,1,2,1,5,2,4,4,1,4]
price = [2.53,7.61,8.07,7.87,7.29,9.39,5.98,9.49,9.20,9.17]
val = [101,108,101,109,104,100,108,100,107,104]
left_data = table(timestamp as timestamp,sym as sym1,id as id,price as price,val as val)
appendForJoin(test_engine,true, left_data)

//将右表数据注入引擎,此时会触发两表进行右连接
timestamp = 2024.10.10T15:12:01.507+1..10
sym = take(["a","b","c","d"],10)
id = [1,2,4,3,5,5,4,2,5,5]
price =  [1.08,9.08,9.97,7.60,1.91,6.77,7.81,8.81,0.61,5.92]
qty =  [208,200,203,202,204,201,206,207,205,205]
right_data = table(timestamp as timestamp,sym as sym2,id as id,price as price,qty as qty)
appendForJoin(test_engine,false, right_data)

select * from output

id

sym

timestamp1

timestamp2

factor1

factor2

timecost

batchsize

1 a 2024.10.10T15:12:01.516 2024.10.10T15:12:01.508 1,070 208 109 10
2 b 2024.10.10T15:12:01.513 2024.10.10T15:12:01.509 1,000 200 109 10
4 c 2024.10.10T15:12:01.514 2024.10.10T15:12:01.510 1,080 203 109 10
5 a 2024.10.10T15:12:01.512 2024.10.10T15:12:01.512 1,040 204 109 10
4 c 2024.10.10T15:12:01.514 2024.10.10T15:12:01.514 1,080 206 109 10
5 a 2024.10.10T15:12:01.512 2024.10.10T15:12:01.516 1,040 205 109 10