createLookupJoinEngine

Syntax

createLookupJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [rightTimeColumn], [checkTimes], [outputElapsedMicroseconds=false], [keepDuplicates=false], [isInnerJoin], [snapshotDir], [snapshotIntervalInMsgCount])

Details

Create a lookup join engine where matchingColumn is used as the join column to perform a left or inner join between the left table (which must be a stream table) and the right table. The lookup join engine is commonly used in scenarios where the right table is updated infrequently (e.g., dimension tables that store daily indicators). If the right table is a non-stream table, the data must be refreshed regularly.

  1. Calculation is triggered only when new data is ingested to the left table.
  2. When keepDuplicates = false, the engine retains only the latest record from each group in the right table, grouped by matchingColumn. In this case:
    • The default join type is a left join, i.e., isInnerJoin = false. This ensures output for every record in the left table - even if no matching record exists in the right table (in which case, null values are returned).
    • Users can change the join type to an inner join by setting isInnerJoin to true.

    When keepDuplicates = true, the engine retains all records in each group from the right table, grouped by matchingColumn. In this case, the join type can only be an inner join. This means output is only generated if a match is found; if no match exists, no output will be produced.

  3. If the right table is a stream table, the data in each group will be updated as new data is ingested into the right table. If the right table is an in-memory table, dimension table or SQL metacode, the system refreshes the right table at regular intervals as specified by checkTimes.

Comparison of the lookup join engine with the asof join, semi left join, and snapshot join engines:

asof join engine

For the asof join engine, the first column of its output table is always the time column. There is no such restriction with the lookup join engine.

With the lookup join engine, calculation is triggered as soon as a new record is ingested into the left table. With the asof join engine, when the timeColumn is specified, there can be a delay before a record in the left table is joined.

left semi join engine

Both lookup join and left semi join engines only respond to new records arriving in the left table, producing output when matches are found in the right table.

For records from the left table that have no match in the right table, the lookup join engine will either output or skip them based on parameter settings. The left semi join engine, however, caches these unmatched records, waiting to output them once matching records appear in the right table.

When caching right table records, the left semi join engine only retains either the first or the most recent record for identical join column values. This ensures that each left table record matches at most one right table record and produces output exactly once. In contrast, for the look up join engine, users can choose to retain all records within each group in the right table. As a result, each record in the left table may match multiple records in the right table, leading to multiple output records.

snapshot join engine
  • The lookup join engine responds only to new records in the left table, supporting both inner join and left join operations.
  • Snapshot join engine responds to new records in either the left or right table, supporting both inner join and full outer join operations.

For more application scenarios, see Streaming Engines.

Arguments

name is a string indicating the name of the lookup join engine. It is the unique identifier of the engine on a data/compute node. It can contain letters, numbers and underscores and must start with a letter.

leftTable is a table object whose schema must be the same as the stream table to which the engine subscribes.

rightTable is a table object. It can be an in-memory table, stream table, dimension table, or SQL metacode that returns a table. For regularly-updated rightTable that is not subscribed to, checkTimes must be specified for timed data refreshing.

outputTable is a table object to hold the calculation results. Create an empty table and specify the column names and types before calling the function.

The columns of outputTable are in the following order:

(1) The first column(s) are the column(s) on which the tables are joined, arranged in the same order as specified in matchingColumn.

(2) Then followed by the calculation results of metrics.

(3) If the outputElapsedMicroseconds is set to true, specify two more columns: a LONG column and an INT column.

metrics is metacode (can be a tuple) specifying the calculation formulas. For more information about metacode, refer to Metaprogramming.
  • metrics can use one or more expressions, built-in or user-defined functions (but not aggregate functions), or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form.

  • metrics can be functions that return multiple values and the columns in the output table to hold the return values must be specified. For example, <func(price) as col1col2>.

To specify a column that exists in both the left and the right tables, use the format tableName.colName. By default, the column from the left table is used.
Note: The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
matchingColumn is a STRING scaler/vector/tuple indicating the column(s) on which the tables are joined. It supports integral, temporal or literal (except UUID) types.
  • When there is only 1 column to match - If the names of the columns to match are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it's a tuple of two elements. For example, if the column is named "sym" in the left table and "sym1" in the right table, then matchingColumn = [[`sym],[`sym1]].

  • When there are multiple columns to match - If both tables share the names of all columns to match, matchingColumn is a STRING vector; otherwise it's a tuple of two elements. For example, if the columns are named "timestamp" and "sym" in the left table, whereas in the right table they're named "timestamp" and "sym1", then matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]].

rightTimeColumn (optional) is a STRING scalar indicating the time column in the right table. If the parameter is specified, the right table will keep the record with the latest timestamp. If there are multiple records with identical timestamps, only the latest record is retained. If the parameter is not specified, the latest ingested record (based) on the system time will be kept.

checkTimes (optional) is a vector of temporal values or a DURATION scalar. If it is specified, the system will regularly update the right table (keeping only the latest data) and ingests the latest data to the lookup join engine. If the right table does not need to be updated regularly, you can leave checkTimes empty, but make sure to manually ingest the table data to the engine after it has been created.
  • If checkTimes is a vector of temporal values, it must be of SECOND, TIME or NANOTIME type. The lookup join engine updates the right table according to the time specified by each element in the vector on a daily basis.

  • If checkTimes is a DURATION scalar, it indicates the interval to update the right table.

outputElapsedMicroseconds (optional) is a Boolean value. The default value is false. It determines whether to output:

  • the elapsed time (in microseconds) from the ingestion of data to the output of result in each batch.

  • the total number of each batch.

keepDuplicates (optional) is a Boolean value indicating whether to keep all records in each group of the right table. When set to false (default), the engine keeps the latest record in each group. When set to true, the engine keeps all records in each group - in this case, the engine performs inner join, i.e., output is only generated if a match is found.

isInnerJoin (optional) is a Boolean value indicating whether to perform an inner join. This parameter can only be adjusted when keepDuplicates is false. The default value false, indicating a left join. This means results are generated for all records from the left table, with null values filling in any unmatched fields from the right table.

To enable snapshot in the streaming engines, specify parameters snapshotDir and snapshotIntervalInMsgCount.

snapshotDir (optional) is a string indicating the directory where the streaming engine snapshot is saved. The directory must already exist, otherwise an exception is thrown. If the snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state. Multiple streaming engines can share a directory where the snapshot files are named as the engine names.

The file extension of a snapshot can be:
  • <engineName>.tmp: a temporary snapshot
  • <engineName>.snapshot: a snapshot that is generated and flushed to disk
  • <engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.

snapshotIntervalInMsgCount (optional) is a positive integer indicating the number of messages to receive before the next snapshot is saved.

Examples

Example 1

login(`admin, `123456)
share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
share table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE]) as output

LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym)
subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)

n = 15
tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
prices.append!(tem1)
sleep(2000)
n  = 10
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
trades.append!(tem2)
sleep(100)
select * from output
sym factor1 factor2 factor3
A 10.1 13 131.3
B 11.1 14 155.4
C 12.1 15 181.5
A 13.1 13 170.3
B 14.1 14 197.4
C 15.1 15 226.5
A 16.1 13 209.3
B 17.1 14 239.4
C 8.1 15 271.5
A 19.1 13 248.3

Example 2

share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
share table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE]) as output
LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps)
subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)

n = 15
tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
prices.append!(tem1)
sleep(2000)
n  = 10
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
trades.append!(tem2)
sleep(100)
select * from output
sym factor1 factor2 factor3
A 10.1 10 101
B 11.1 11 122.1
C 12.1 12 145.2
A 13.1 10 131
B 14.1 11 155.1
C 15.1 12 181.2
A 16.1 10 161
B 17.1 11 188.1
C 18.1 12 217.2
A 19.1 10 191

Example 3. The right table is an in-memory table, so checkTimes must be set.

share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE]) as output
prices=table(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT])
LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps, checkTimes=1s)
subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)

n = 15
tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
prices.append!(tem1)
sleep(2000)
n  = 10
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
trades.append!(tem2)
sleep(100)
select * from output
sym factor1 factor2 factor3
A 10.1 10 101
B 11.1 11 122.1
C 12.1 12 145.2
A 13.1 10 131
B 14.1 11 155.1
C 15.1 12 181.2
A 16.1 10 161
B 17.1 11 188.1
C 18.1 12 217.2
A 19.1 10 191

Example 4. Join the left table "trades" (a real-time stream table) and the right table "prices" (a dimension table with infrequent updates) to look up the matched records in column "id" from the right table.

share streamTable(1000:0, `time`volume`id, [TIMESTAMP, INT,INT]) as trades
dbPath="dfs://testlj"
if(existsDatabase(dbPath)){
   dropDatabase(dbPath)
}
rt=table(1000:0, `time`price`id, [TIMESTAMP, DOUBLE, INT])
db=database(dbPath, VALUE, `A`B`C)
prices=db.createDimensionTable(rt,`rightTable)
share table(10000:0, `id`volume`price`prod, [INT,INT,DOUBLE,DOUBLE]) as outputTable

tradesLookupJoin = createLookupJoinEngine(name="streamLookup1", leftTable=trades, rightTable=prices, outputTable=outputTable, metrics=<[volume,price,volume*price]>, matchingColumn=`id, rightTimeColumn=`time,checkTimes=1s)
subscribeTable(tableName="trades", actionName="append_trades", offset=0, handler=appendForJoin{tradesLookupJoin, true}, msgAsTable=true)

def writeData(t,n){
    timev = 2021.10.08T01:01:01.001 + timestamp(1..n)
    volumev = take(1..n, n)
    id = take(1 2 3, n)
    insert into t values(timev, volumev, id)
}

writeData(rt, 10)
prices.append!(rt)
sleep(2000)
writeData(trades, 6)
sleep(2)

select * from outputTable
id volume price prod
1 1 10 10
2 2 8 16
3 3 9 27
1 4 10 40
2 5 8 40
3 6 9 54

Example 5. Specify SQL metacode for rightTable to join columns queried from a DFS partitioned table.

share streamTable(1000:0, `time`volume`id, [TIMESTAMP, INT,INT]) as trades
dbPath="dfs://lookupjoinDB"
if(existsDatabase(dbPath)){
   dropDatabase(dbPath)
}
rt=table(1000:0, `time`price`id, [TIMESTAMP, DOUBLE, INT])
db=database(dbPath, HASH, [INT,5])
prices=db.createPartitionedTable(rt,`rightTable, `id)
share table(10000:0, `id`volume`price`prod, [INT,INT,DOUBLE,DOUBLE]) as outputTable

tradesLookupJoin = createLookupJoinEngine(name="streamLookup1", leftTable=trades, rightTable=<select * from loadTable(dbPath, `rightTable)>, outputTable=outputTable, metrics=<[volume,price,volume*price]>, matchingColumn=`id, rightTimeColumn=`time,checkTimes=1s)
subscribeTable(tableName="trades", actionName="append_trades", offset=0, handler=appendForJoin{tradesLookupJoin, true}, msgAsTable=true)

def writeData(t,n){
    timev = 2021.10.08T01:01:01.001 + timestamp(1..n)
    volumev = take(1..n, n)
    id = take(1 2 3, n)
    insert into t values(timev, volumev, id)
}

writeData(rt, 10)
prices.append!(rt)
sleep(2000)
writeData(trades, 6)
sleep(2)

select * from outputTable
id volume price prod
1 1 10 10
2 2 8 16
3 3 9 27
1 4 10 40
2 5 8 40
3 6 9 54

Example 6.Set isInnerJoin to true, so that no results are output when there is no matching record in the right table.

login(`admin, `123456)
share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
share table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE]) as output

LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, isInnerJoin=true)

n = 15
tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
appendForJoin(LjEngine, false,tem1)
sleep(2000)

// The left table contains 2 records that have no matching reocrds in the right table.
n  = 10
tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C`d, n) as sym,take(0.1+10..20,n) as price)
appendForJoin(LjEngine, true,tem2)
sleep(100)
select count(*) from output // 8
sym factor1 factor2 factor3
A 10.1 13 131.3
B 11.1 14 155.4
C 12.1 15 181.5
A 14.1 13 183.3
B 15.1 14 211.4
C 16.1 15 241.5
A 18.1 13 235.3
B 19.1 14 267.4