createLookupJoinEngine
Syntax
createLookupJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [rightTimeColumn], [checkTimes])
Details
Create a lookup join streaming engine. The engine left joins two stream tables, or a stream table and a non-stream table (refreshed regularly), on matchingColumn. Use this engine when the right table has infrequent updates (e.g., a dimension table with intraday indicators).
Note:
1. A left join is triggered only when new data is ingested to the left table.
2. Data in the right table is grouped by matchingColumn. Only the latest record in each group is kept by the engine.
If the right table is a stream table, the data in each group will be updated as new data is ingested into the right table.
If the right table is an in-memory table or a DFS table (currently only dimension tables are supported), the system refreshes the right table at regular intervals as specified by checkTimes.
Although the lookup join engine performs left join on the left table and the right table, its output is different from that of a standard left join operation:
Left join - for each record in the left table, if it has multiple matching records in the right table, all these records will be returned.
Lookup join engine - returns only the latest matching record in the right table for each record in the left table.
The lookup join engine and the asof join engine are different in the following aspects:
For the asof join engine, the first column of its output table is always the time column. There is no such restriction with the lookup join engine.
With the lookup join engine, a join is triggered as soon as a new record is ingested into the left table. With the asof join engine, when the timeColumn is specified, there can be a delay before a record in the left table is joined.
Note the difference between the lookup join engine and the left-semi join engine: with the left semi join engine, there will be no return until a match in the right table is found.
For more application scenarios, see Streaming Engines.
Arguments
name is a string indicating the name of the lookup join engine. It is the unique identifier of the engine on a data/compute node. It can contain letters, numbers and underscores and must start with a letter.
leftTable is a table object whose schema must be the same as the stream table to which the engine subscribes.
rightTable is a table object. It can be an in-memory table, stream table or dimension table. Note that if the rightTable is not subscribed to, checkTimes must be specified for timed data refreshing.
outputTable is a table object to hold the calculation results. Create an empty table and specify the column names and types before calling the function.
The columns of outputTable are in the following order:
(1) The first column(s) are the column(s) on which the tables are joined, arranged in the same order as specified in matchingColumn.
(2) Then followed by the calculation results of metrics.
metrics is metacode (which can be a tuple) specifying the calculation formulas. For more information about metacode, please refer to Metaprogramming.
metrics can use one or more expressions, built-in or user-defined functions, but not aggregate functions.
metrics can be functions that return multiple values and the columns in the output table to hold the return values must be specified. For example, <func(price) as `col1`col2>.
To specify a column that exists in both the left and the right tables, use the format tableName.colName. By default, the column from the left table is used.
The column names specified in metrics are case-sensitive and must be consistent with the column names of the input tables.
matchingColumn is a STRING scaler/vector/tuple indicating the column(s) on which the tables are joined. It supports integral, temporal or literal (except UUID) types.
1. When there is only 1 column to match - If the names of the columns to match are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it’s a tuple of two elements. For example, if the column is named “sym” in the left table and “sym1” in the right table, then matchingColumn = [[`sym],[`sym1]].
2. When there are multiple columns to match - If both tables share the names of all columns to match, matchingColumn is a STRING vector; otherwise it’s a tuple of two elements. For example, if the columns are named “timestamp” and “sym” in the left table, whereas in the right table they’re named “timestamp” and “sym1”, then matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]].
rightTimeColumn is a STRING scalar indicating the time column in the right table. If the parameter is specified, the right table will keep the record with the latest timestamp. If there are multiple records with identical timestamps, only the latest record is retained. If the parameter is not specified, the latest ingested record (based) on the system time will be kept.
checkTimes is a vector of temporal values or a DURATION scalar. If it is specified, the system will regularly update the right table (keeping only the latest data) and ingests the latest data to the lookup join engine. If the right table does not need to be updated regularly, you can leave checkTimes empty, but make sure to manually ingest the table data to the engine after it has been created.
If checkTimes is a vector of temporal values, it must be of SECOND, TIME or NANOTIME type. The lookup join engine updates the right table according to the time specified by each element in the vector on a daily basis.
If checkTimes is a DURATION scalar, it indicates the interval to update the right table.
First Release
1.30.16/2.00.4
Examples
Ex. 1
$ login(`admin, `123456)
$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)
$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output
sym |
factor1 |
factor2 |
factor3 |
---|---|---|---|
A |
10.1 |
13 |
131.3 |
B |
11.1 |
14 |
155.4 |
C |
12.1 |
15 |
181.5 |
A |
13.1 |
13 |
170.3 |
B |
14.1 |
14 |
197.4 |
C |
15.1 |
15 |
226.5 |
A |
16.1 |
13 |
209.3 |
B |
17.1 |
14 |
239.4 |
C |
8.1 |
15 |
271.5 |
A |
19.1 |
13 |
248.3 |
Ex. 2
$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)
$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output
sym |
factor1 |
factor2 |
factor3 |
---|---|---|---|
A |
10.1 |
10 |
101 |
B |
11.1 |
11 |
122.1 |
C |
12.1 |
12 |
145.2 |
A |
13.1 |
10 |
131 |
B |
14.1 |
11 |
155.1 |
C |
15.1 |
12 |
181.2 |
A |
16.1 |
10 |
161 |
B |
17.1 |
11 |
188.1 |
C |
18.1 |
12 |
217.2 |
A |
19.1 |
10 |
191 |
Ex. 3: The right table is an in-memory table, so checkTimes must be set.
$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ prices=table(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT])
$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps, checkTimes=1s)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output
sym |
factor1 |
factor2 |
factor3 |
---|---|---|---|
A |
10.1 |
10 |
101 |
B |
11.1 |
11 |
122.1 |
C |
12.1 |
12 |
145.2 |
A |
13.1 |
10 |
131 |
B |
14.1 |
11 |
155.1 |
C |
15.1 |
12 |
181.2 |
A |
16.1 |
10 |
161 |
B |
17.1 |
11 |
188.1 |
C |
18.1 |
12 |
217.2 |
A |
19.1 |
10 |
191 |
Ex 4: Join the left table “trades” (a real-time stream table) and the right table “prices” (a dimension table with infrequent updates) to look up the matched records in column “id” from the right table.
$ share streamTable(1000:0, `time`volume`id, [TIMESTAMP, INT,INT]) as trades
$ dbPath="dfs://testlj"
$ if(existsDatabase(dbPath)){
$ dropDatabase(dbPath)
$ }
$ rt=table(1000:0, `time`price`id, [TIMESTAMP, DOUBLE, INT])
$ db=database(dbPath, VALUE, `A`B`C)
$ prices=db.createTable(rt,`rightTable)
$ outputTable = table(10000:0, `id`volume`price`prod, [INT,INT,DOUBLE,DOUBLE])
$ tradesLookupJoin = createLookupJoinEngine(name="streamLookup1", leftTable=trades, rightTable=prices, outputTable=outputTable, metrics=<[volume,price,volume*price]>, matchingColumn=`id, rightTimeColumn=`time,checkTimes=1s)
$ subscribeTable(tableName="trades", actionName="append_trades", offset=0, handler=appendForJoin{tradesLookupJoin, true}, msgAsTable=true)
$ def writeData(t,n){
$ timev = 2021.10.08T01:01:01.001 + timestamp(1..n)
$ volumev = take(1..n, n)
$ id = take(1 2 3, n)
$ insert into t values(timev, volumev, id)
$ }
$ //n=7
$ writeData(rt, 10)
$ prices.append!(rt)
$ sleep(2000)
$ writeData(trades, 6)
$ sleep(2)
$ select * from outputTable
id |
volume |
price |
prod |
---|---|---|---|
1 |
1 |
10 |
10 |
2 |
2 |
8 |
16 |
3 |
3 |
9 |
27 |
1 |
4 |
10 |
40 |
2 |
5 |
8 |
40 |
3 |
6 |
9 |
54 |