createAsofJoinEngine

Syntax

createAsofJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [timeColumn], [useSystemTime=false], [delayedTime], [garbageSize], [sortByTime])

Details

Create an asof join streaming engine. Streams are ingested into the left table and the right table and joined on matchingColumn and timeColumn (or system time). For each record in the left table, join it with the right table record (1) with matching matchingColumn value and (2) whose timestamp is the last of the timestamps that are less than or equal to the timestamp of the left table record. This function returns a table object holding the asof join results.

Asof join engine joins records that have no exact match on time columns. For each timestamp in one table, the engine obtains the latest (i.e., current as of the timestamp) value from another table.

Note:

  • The records in the left table and the right table must be sequenced by time.

  • If delayedTime is not specified, a join operation is only triggered when the right table receives a record whose timestamp is greater than the timestamp of the latest record in the left table.

  • If delayedTime is specified, a join operation is triggered when either of the following conditions is met:

    • In the left table, the difference between the timestamp of the latest record and the timestamp of the previous unjoined record is greater than delayedTime.

    • The record is still not joined after 2 * delayedTime or 2 seconds, whichever is larger, since its ingestion into the left table.

For more application scenarios, see Streaming Engines.

Arguments

name is a string indicating the name of the asof join engine. It is the unique identifier of the engine on a data/compute node. It can contain letters, numbers and underscores and must start with a letter.

leftTable and rightTable are table objects whose schema must be the same as the stream table to which the engine subscribes.

outputTable is a table to which the engine inserts calculation result. It can be an in-memory table or a DFS table. Before calling a function, an empty table with specified column names must be created.

The columns of outputTable are in the following order:

(1) The first column must be a temporal column.
  • if useSystemTime = true, the data type must be TIMESTAMP;

  • if useSystemTime = false, it has the same data type as timeColumn.

(2) Then followed by one or more columns on which the tables are joined, arranged in the same order as specified in matchingColumn.

(3) Further followed by one or more columns which are the calculation results of metrics.

metrics is metacode (can be a tuple) specifying the calculation formulas. For more information about metacode, refer to Metaprogramming.

  • metrics can use one or more expressions, built-in or user-defined functions (but not aggregate functions), or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form.

  • metrics can be functions with multiple returns and the columns in the output table to hold the return values must be specified. For example, <func(price) as `col1`col2>.

  • To specify a column that exists in both the left and the right tables, use the format tableName.colName.

  • The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.

matchingColumn is a STRING scaler/vector/tuple indicating the column(s) on which the tables are joined. It supports integral, temporal or literal (except UUID) types.

  • When there is only 1 column to match - If the names of the columns to match are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it's a tuple of two elements. For example, if the column is named "sym" in the left table and "sym1" in the right table, then matchingColumn = [[`sym],[`sym1]].
  • When there are multiple columns to match - If both tables share the names of all columns to match, matchingColumn is a STRING vector; otherwise it's a tuple of two elements. For example, if the columns are named "orderNo" and "sym" in the left table, whereas in the right table they're named "orderNo" and "sym1", then matchingColumn = [[`orderNo, `sym], [`orderNo,`sym1]].

timeColumn (optional) specifies the name of the time column in the left table and the right table. The time columns must have the same data type. When useSystemTime = false, it must be specified. If the names of the time column in the left table and the right table are the same, timeColumn is a string. Otherwise, it is a vector of 2 strings indicating the time column in each table.

useSystemTime (optional) indicates whether the left table and the right table are joined on the system time, instead of on timeColumn.

  • useSystemTime = true: join records based on the system time (timestamp with millisecond precision) when they are ingested into the engine.
  • useSystemTime = false (default): join records based on the specified *timeColumn* from the left table and the right table.

delayedTime (optional) is a positive integer with the same precision as timeColumn, indicating the maximum time to wait before the engine joins an uncalculated record in the left table with a right table record. To specify delayedTime, timeColumn must be specified. For more information, see Details.

garbageSize (optional) is a positive integer with the default value of 5,000 (rows). As the subscribed data is ingested into the engine, it continues to take up the memory. Within the left/right table, the records are grouped by matchingColumn values; When the number of records in a group exceeds garbageSize, the system will remove those already been calculated from memory.

sortByTime (optional) is a Boolean value that indicates whether the output data is globally sorted by time. The default value is false, meaning the output data is sorted only within groups. Note that if sortByTime is set to true, the parameter delayedTime cannot be specified, and the data input to the left and right tables must be globally sorted.

Examples

share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share streamTable(1:0, `time`sym`bid`ask, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE]) as quotes
share table(100:0, `time`sym`price`bid`ask`spread, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as prevailingQuotes

ajEngine=createAsofJoinEngine(name="aj1", leftTable=trades, rightTable=quotes, outputTable=prevailingQuotes, metrics=<[price, bid, ask, abs(price-(bid+ask)/2)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false)
tmp1=table(2020.08.27T09:30:00.000+2 8 20 22 23 24 as time, take(`A`B, 6) as sym, 20.01 20.04 20.07 20.08 20.4 20.5 as price)
tmp2=table(2020.08.27T09:30:00.000+1 5 6 11 19 20 21 as time, take(`A`B, 7) as sym, 20 20.02 20.03 20.05 20.06 20.6 20.4 as bid,  20.01 20.03 20.04 20.06 20.07 20.5 20.6 as ask)
tmp1.sortBy!(`time)
tmp2.sortBy!(`time)

subscribeTable(tableName="trades", actionName="joinLeft", offset=0, handler=appendForJoin{ajEngine, true}, msgAsTable=true)
subscribeTable(tableName="quotes", actionName="joinRight", offset=0, handler=appendForJoin{ajEngine, false}, msgAsTable=true)

trades.append!(tmp1)
quotes.append!(tmp2)

sleep(100)
select time, sym, bid from prevailingQuotes

// output
time	sym	bid
2020.08.27T09:30:00.002	A	20
2020.08.27T09:30:00.020	A	20.06
2020.08.27T09:30:00.008	B	20.02
// clean environment
unsubscribeTable(tableName="trades", actionName="joinLeft")
unsubscribeTable(tableName="quotes", actionName="joinRight")
undef(`trades,SHARED)
undef(`quotes,SHARED)
dropAggregator(name="aj1")

//define an asof join engine and set sortByTime=true
share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
share streamTable(1:0, `time`sym`bid`ask, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE]) as quotes
share table(100:0, `time`sym`price`bid`ask`spread, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as prevailingQuotes
ajEngine=createAsofJoinEngine(name="aj1", leftTable=trades, rightTable=quotes, outputTable=prevailingQuotes, metrics=<[price, bid, ask, abs(price-(bid+ask)/2)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false, sortByTime=true)

tmp1=table(2020.08.27T09:30:00.000+2 8 20 22 23 24 as time, take(`A`B, 6) as sym, 20.01 20.04 20.07 20.08 20.4 20.5 as price)
tmp2=table(2020.08.27T09:30:00.000+1 5 6 11 19 20 21 as time, take(`A`B, 7) as sym, 20 20.02 20.03 20.05 20.06 20.6 20.4 as bid,  20.01 20.03 20.04 20.06 20.07 20.5 20.6 as ask)
tmp1.sortBy!(`time)
tmp2.sortBy!(`time)

//only appendForJoin can be used to insert data
subscribeTable(tableName="trades", actionName="joinLeft", offset=0, handler=appendForJoin{ajEngine, true}, msgAsTable=true)
subscribeTable(tableName="quotes", actionName="joinRight", offset=0, handler=appendForJoin{ajEngine, false}, msgAsTable=true)

trades.append!(tmp1)
quotes.append!(tmp2)

sleep(100)

//check the output table
select time, sym, bid from prevailingQuotes

// output
time                   sym   bid
2020.08.27T09:30:00.002      A       20
2020.08.27T09:30:00.008      B       20.02
2020.08.27T09:30:00.020      A       20.06