Equi Join Engine

DolphinDB provides multiple lightweight and easy-to-use join engines. This page will introduce equi join engine.

Introduction

Note: Since DolphinDB 1.30.21/2.00.9, the function previously known as createEqualJoinEngine has been renamed to createEquiJoinEngine. The old function name can still be used as an alias.

Similar to the SQL equi join, the equi join engine joins the left table and the right table on equivalent values in the join column(s) and the time column, and outputs a record when a match is identified.

Unlike the SQL equi join, the engine does not cache all the historical data. This means that a newly ingested record may not find a match in the other table if the matching record has already been removed from cache. This is because the engine is designed to efficiently process keyed streams that have the join columns and the time column as the key. For instance, it is used for processing streams where each stock has only one record per minute, ensuring optimal performance for high-frequency data processing.

The image below illustrates how the equi join engine processes ingested streams. Each stream has three columns, a join column, a time column, and a metrics column. For application scenarios of the engine, see Section "Equi Join Engine: Combining Metrics in Data Sources Captured at One-Minute Interval".

We encourage users to use the equi join engine to process data with unique values in the join column(s) and the time column, as shown in the recommended application scenario. However, if you wish to explore the engine's behavior in other scenarios, the following paragraph explains its internal workings.

The equi join engine internally maintains two keyed tables in the cache, one for the left table and the other for the right table, with the join column(s) and the time column serving as the keys. When a record is ingested into the left table, the engine searches for its matching record in the cache for the right table. If a match is found, the engine outputs the join result and marks the matching record as joined in the cache for the right table. Since the record is immediately joined upon ingestion to the left table, it is not cached (which explains why the greyed record "(A,t1.4)" in the image above cannot find a matching record in the other table and, therefore, is not output). If a match is not found, the ingested record is added to the left table's cache and marked as un-joined, awaiting a potential match in future ingestions.

Note: As the left and right tables are interchangeable in the equi join engine, the same logic applies if the record is first ingested into the right table in this example.

The equi join engine regularly performs garbage collection on all cached data, whether joined or un-joined. For details on the garbage collection rules, refer to the user manual. If the equi join engine produces different results from the SQL equi join, it could be due to the differences in the specified garbage collection rules.

createEquiJoinEngine Syntax
createEquiJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, timeColumn, [garbageSize=5000], [maxDelayedTime])

Details about the parameters, see: createEquiJoinEngine.

Use Case: Combining Metrics in Data Sources Captured at One-Minute Interval

In quantitative trading, it's common to downsample real-time quote data and trade data into one-minute intervals for use in trading strategies. This often involves combining metrics from different data sources into a single table. In this example, we demonstrate how to aggregate quotes data and trades data into one-minute intervals in real time and then join the metrics from both tables into a single unified table.

Notably, in this example, there's only one quote/trade record for each stock per minute after the aggregation. Additionally, for each record, we expect it to be returned only after it has been joined. The equi join engine can implement this logic, as demonstrated in the following script.

Note: If the version of your DolphinDB server is earlier than 1.30.21/2.00.9, please replace createEquiJoinEngine with createEqualJoinEngine in the script. The createEqualJoinEngine function has now been renamed to createEquiJoinEngine. You can still use the old name as an alias.

// create table
share streamTable(1:0, `Sym`TradeTime`Side`TradeQty, [SYMBOL, TIME, INT, LONG]) as trades
share streamTable(1:0, `UpdateTime`Sym`BuyTradeQty`SellTradeQty, [TIME, SYMBOL, LONG, LONG]) as tradesMin
share streamTable(1:0, `Sym`Time`Bid1Price`Bid1Qty, [SYMBOL, TIME, DOUBLE, LONG]) as quotes
share streamTable(1:0, `UpdateTime`Sym`AvgBid1Amt, [TIME, SYMBOL, DOUBLE]) as quotesMin
share streamTable(1:0, `UpdateTime`Sym`AvgBid1Amt`BuyTradeQty`SellTradeQty, [TIME, SYMBOL, DOUBLE, LONG, LONG]) as output

// create engine: 
eqJoinEngine = createEquiJoinEngine(name="EquiJoin", leftTable=tradesMin, rightTable=quotesMin, outputTable=output, metrics=<[AvgBid1Amt, BuyTradeQty, SellTradeQty]>, matchingColumn=`Sym, timeColumn=`UpdateTime)
// create engine: 
tsEngine1 = createTimeSeriesEngine(name="tradesAggr", windowSize=60000, step=60000, metrics=<[sum(iif(Side==1, 0, TradeQty)), sum(iif(Side==2, 0, TradeQty))]>, dummyTable=trades, outputTable=getLeftStream(eqJoinEngine), timeColumn=`TradeTime, keyColumn=`Sym, useSystemTime=false, fill=(0, 0))
// create engine: 
tsEngine2 = createTimeSeriesEngine(name="quotesAggr", windowSize=60000, step=60000, metrics=<[avg(iif(Bid1Price!=NULL, Bid1Price*Bid1Qty, 0))]>, dummyTable=quotes, outputTable=getRightStream(eqJoinEngine), timeColumn=`Time, keyColumn=`Sym, useSystemTime=false, fill=(0.0))

// subscribe topic
subscribeTable(tableName="trades", actionName="minAggr", handler=tsEngine1, msgAsTable=true, offset=-1, hash=1)
subscribeTable(tableName="quotes", actionName="minAggr", handler=tsEngine2, msgAsTable=true, offset=-1, hash=2) 
  • In this script, we create two time-series engines to aggregate trades and quotes data into one-minute intervals in real time. Next, by creating an engine cascade, we ingest the aggregated data into the left and right tables of an equi join engine. For more information about cascading, see xxxx.

Note: In the equi join engine, the left and right tables are interchangeable. If you swap the left and right tables, the join result would remain the same.

The following script creates two stream tables, "trades" and "quotes", which contain the data to be ingested into the time-series engines.

// generate data: quotes
t1 = table(`A`B`A`B`A`B as Sym, 10:00:52.000+(3 3 6 6 9 9)*1000 as Time, (3.5 7.6 3.6 7.6 3.6 7.6) as Bid1Price, (1000 2000 500 1500 400 1800) as Bid1Qty)
// generate data: trades
t2 = table(`A`A`B`A`B`B`A`B`B`A as Sym, 10:00:54.000+(1..10)*700 as TradeTime,  (1 2 1 1 1 1 2 1 2 2) as Side, (1..10) * 10 as TradeQty)
// input
trades.append!(t2)
quotes.append!(t1)

Join result: