Window Join Engine
DolphinDB provides multiple lightweight and easy-to-use join engines. This page will introduce window join engine.
Introduction
Similar to the SQL window join
, the window join engine groups data
by the join column and associates a record from the left table with records within a
window of the right table for each group. The left or right stream ingested into the
engine should be ordered in time. Based on the timestamp of each record in the left
table, the engine determines a window (which may contain zero or multiple records)
in the right table. Each left record is joined with the aggregated value of the
window in the right table.
Note: The asof join engine introduced above can be seen as a special case of the window join engine.
The parameter useSystemTime determines the join behavior of a window join engine:
(1) When useSystemTime=true, the engine joins the streams based the system time of data arrivals. The join is triggered when the system time reaches the right boundary of the window.
(2) When useSystemTime=false, the engine joins the streams based on the time column of the input. The join is triggered when a right record arrives with a timestamp greater than the right boundary of the window. You can also set the parameter delayedTime to define a timeout trigger.
The following figures show how a window join engine outputs a result based on the time column of the input.
- Standard windows (window = a:b)
The parameter window is set to -1:2. Suppose the current timestamp in the left table is t, a window over the right records is determined by moving the current timestamp one time unit backward and two time units forward, i.e., [t-1:t+2]. The calculation and output are triggered by a record arriving in the right table with a timestamp greater than or equal to the right boundary of the window. This record does not participate in the window calculation.
- Special windows (window = 0:0)
The parameter window is set to 0:0 to specify a special window. The boundaries of the window over the right table are determined by the current timestamp in the left table and its previous timestamp. The calculation and output is triggered by new data arriving in the right table with a timestamp greater than or equal to the current left timestamp. This record does not participate in the window calculation.
createWindowJoinEngine Syntax
createWindowJoinEngine(name, leftTable, rightTable, outputTable, window, metrics, matchingColumn, [timeColumn], [useSystemTime=false], [garbageSize = 5000], [maxDelayedTime], [nullFill], [outputElapsedMicroseconds=false], [sortByTime])
Details about the parameters, see: createWindowJoinEngine.
Use Case: Joining 3-min OHLC with Trades Data
In this use case, we use the window join engine to join 3-minute OHLC with trades data. Each OHLC record is joined with the aggregated value of the trades records within a window.
For the trades occurred between two adjacent OHLC records, we expect to calculate the total trading volume while maintaining the original trade information in the output columns.
Note: The following code example uses array vectors which are only supported in DolphinDB beta version 2.00.
// create table
share streamTable(1:0, `Sym`TradeTime`Side`TradeQty, [SYMBOL, TIME, INT, LONG]) as trades
share streamTable(1:0, `Sym`Time`Open`High`Low`Close, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as ohlc
share streamTable(1:0, `Time`Sym`Open`High`Low`Close`BuyQty`SellQty`TradeQtyList`TradeTimeList, [TIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, LONG, LONG[], TIME[]]) as output
// create engine
wjMetrics = <[Open, High, Low, Close, sum(iif(Side==1, TradeQty, 0)), sum(iif(Side==2, TradeQty, 0)), TradeQty, TradeTime]>
fillArray = [00:00:00.000, "", 0, 0, 0, 0, 0, 0, [[]], [[]]]
wjEngine = createWindowJoinEngine(name="windowJoin", leftTable=ohlc, rightTable=trades, outputTable=output, window=0:0, metrics=wjMetrics, matchingColumn=`Sym, timeColumn=`Time`TradeTime, useSystemTime=false, nullFill=fillArray)
// subscribe topic
subscribeTable(tableName="ohlc", actionName="appendLeftStream", handler=getLeftStream(wjEngine), msgAsTable=true, offset=-1, hash=0)
subscribeTable(tableName="trades", actionName="appendRightStream", handler=getRightStream(wjEngine), msgAsTable=true, offset=-1, hash=1)
In this script, we subscribe to table "ohlc" and "trades" to ingest the streams into the join engine, where:
- The parameter useSystemTime is set to false, indicating the join is performed based on the time columns, i.e., "Time" from the left table and "TradeTime" from the right table.
- The parameter window is set to 0:0, meaning the windows over the right table are determined by the current timestamp in the left table and its previous timestamp.
- The parameter metrics specifies the join metrics. For instance, the
metrics Open, High, Low, and Close indicate that the corresponding columns are
retrieved from the "ohlc" table, and
sum(iif(Side==1, TradeQty, 0))
is calculated based on the columns from the right table "trades". Note that TradeQty is a column retrieved from the right table "trades". Since no aggregate function is applied to the column, all values within the window will be retained in an output field in the form of an array vector. - The parameter nullFill specifies how to fill the null values in the output table. In this example, null values in the price fields (such as "Open") are filled in with 0. Note that nullFill is a tuple that must be of the same length and type as the output table columns.
// generate data: ohlc
t1 = table(`A`B`A`B`A`B as Sym, 10:00:00.000+(3 3 6 6 9 9)*1000 as Time, (NULL NULL 3.5 7.6 3.5 7.6) as Open, (3.5 7.6 3.6 7.6 3.6 7.6) as High, (3.5 7.6 3.5 7.6 3.4 7.5) as Low, (3.5 7.6 3.5 7.6 3.6 7.5) as Close)
// generate data: trade
t2 = table(`A`A`B`A`B`B`A`B`A`A as Sym, 10:00:02.000+(1..10)*700 as TradeTime, (1 2 1 1 1 1 2 1 2 2) as Side, (1..10) * 10 as TradeQty)
// input data
trades.append!(t2)
ohlc.append!(t1)
The correspondence of records between the input streams is shown below:
The output table is shown below. The last two columns use array vectors to display all values of column "TradeQty" and "TradeTime" within the window.
The output table has one record less than the left table "ohlc", i.e., the record with a timestamp of 10:00:09.000 of group "Sym" B was not joined or output. This is because no trade arrived with a timestamp equal to or greater than 10:00:09.000 to close the window. If a result is expected to be output once a left record arrives, you can set useSystemTime=true to join based on the system time. In this case, for any incoming record in the left table ("ohlc"), the windows over the right table ("trades") include all the records that arrived between the previous "ohlc" record and the current record.