Asof Join Engine
DolphinDB provides multiple lightweight and easy-to-use join engines. This page will introduce asof join engine.
Introduction
Similar to the SQL asof join
, the asof join engine groups data by
the join column and associates the left and right tables based on temporal proximity
within each group. The left or right stream ingested into the engine should be
ordered in time. For each record from the left table, the engine matches it with a
record in the right table where the timestamp is the closest prior or equal to the
current timestamp. The engine outputs a result for each ingested left record.
The parameter useSystemTime determines the join behavior of an asof join engine:
(1) When useSystemTime=true, the engine joins the streams based on the system time of data arrivals. Each record from the left table is immediately associated and the join result is output.
(2) When useSystemTime=false, the engine joins the streams based on the time columns of the inputs. The join is triggered when a right record arrives with a timestamp greater than or equal to the timestamp of the latest left record. You can also set the parameter delayedTime to define a timeout trigger.
The following figure shows how an asof join engine outputs a result based on the data time (without timeout trigger). The calculation and output are triggered by new data arriving in the right table.
createAsofJoinEngine Syntax
createAsofJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [timeColumn], [useSystemTime=false], [delayedTime], [garbageSize], [sortByTime])
Details about the parameters, see: createAsofJoinEngine.
Use Case: Calculating Transaction Costs
Since the tick trades and quotes often have different occuring times, we can use the asof join engine for non-exact match. This allows us to join each trade record with the nearest quotes that occurred before to calculate the transaction costs.
// create table
share streamTable(1:0, `Sym`TradeTime`TradePrice, [SYMBOL, TIME, DOUBLE]) as trades
share streamTable(1:0, `Sym`Time`Bid1Price`Ask1Price, [SYMBOL, TIME, DOUBLE, DOUBLE]) as snapshot
share streamTable(1:0, `TradeTime`Sym`TradePrice`TradeCost`SnapshotTime, [TIME, SYMBOL, DOUBLE, DOUBLE, TIME]) as output
// create engine
ajEngine = createAsofJoinEngine(name="asofJoin", leftTable=trades, rightTable=snapshot, outputTable=output, metrics=<[TradePrice, abs(TradePrice-(Bid1Price+Ask1Price)/2), snapshot.Time]>, matchingColumn=`Sym, timeColumn=`TradeTime`Time, useSystemTime=false, delayedTime=1000)
// subscribe topic
subscribeTable(tableName="trades", actionName="appendLeftStream", handler=getLeftStream(ajEngine), msgAsTable=true, offset=-1, hash=0)
subscribeTable(tableName="snapshot", actionName="appendRightStream", handler=getRightStream(ajEngine), msgAsTable=true, offset=-1, hash=1)
In this script, we subscribe to the table "trades" and "quotes" to ingest the streams into the join engine, where:
- The parameter useSystemTime is set to false, indicating the join is performed based on the input time columns, i.e., "TradeTime" from the left table and "Time" from the right table.
- The parameter delayedTime is set to force trigger the calculations. If delayedTime is not specified, the join is only triggered when a record arrives with a greater timestamp in the right table. However, the ingestion of the record may be delayed, or the timestamp may never be greater than that of a record in the left table. In this case, if you expect each incoming record in the left table to trigger an output record, it is recommended to set delayedTime and trigger the calculation once a timeout is reached.
- The "quotes.Time" specified in the parameter metrics indicates the column "Time" in the "quotes" table is taken as the time column in the output table. Since the "trades" table also contains a "Time" column, if "Time" is specified without a prefix, the output takes the column from the left table by default.
- The parameter garbageSize is not specified and the default value is used. This parameter does not affect the calculation results regardless of its value, but only affects the memory usage of the engine.
The following script writes "quotes" to the right stream and "trades" to the left stream:
// generate data: trade
t1 = table(`A`A`B`A`B`B as Sym, 10:00:02.000+(1..6)*700 as TradeTime, (3.4 3.5 7.7 3.5 7.5 7.6) as TradePrice)
// generate data: quotes
t2 = table(`A`B`A`B as Sym, 10:00:00.000+(3 3 6 6)*1000 as Time, (3.5 7.6 3.5 7.6) as Bid1Price, (3.5 7.6 3.6 7.6) as Ask1Price)
// input data
quotes.append!(t2)
trades.append!(t1)
The correspondence of records between the input streams is shown below:
The output table is shown below.
In this example, the parameter delayedTime is specified. Even though no record with a timestamp greater than 10:00:06.200 is ingested in the right stream for the group "Sym" B, the join is still triggered two seconds after the last left record (B,10:00:06.200, 7.6) arrives.