Join Engines

Data analysis often involves joining data from multiple sources. In addition to various SQL join operations for historical data, DolphinDB also provides streaming join engines for real-time computing scenarios that require low-latency processing. These engines use incremental calculation to ensure millisecond-level latency performance even with massive data streams.

Table Joins: From Batch to Stream

Batch Joins

For batch processing scenarios, DolphinDB offers SQL-compatible join methods (including equi join, full join, left join, prefix join, and cross join), and also methods especially designed for time-series data including asof join and window join.

For example, perform asof join on historical data:

// data
t1 = table(take(`A, 4) as Sym, 10:10:03.000+(10 2100 2890 6030) as Time, 12.5 12.5 12.6 12.6 as Price)
t2 = table(take(`A, 4) as Sym, 10:10:00.000+(0 3000 6000 9000) as Time, 12.6 12.5 12.7 12.6 as BidPrice)

// asof join calculation
select *  from aj(t1, t2, `Time)

For each record with a timestamp T in the left table, the asof join takes from the right table a record with the closest timestamp that is prior or equal to T. In the above example where the join column "Time" is specified, each record in table t1 is paired with the record in table t2 whose "Time" value is less than or equal to the "Time" value in t1.

Streaming Joins

The joins over batch tables typically process bounded data sets, which means the input tables are processed all at once to generate a full output. Real-time streaming joins deal with data sets that are unbounded and have no defined end. In such cases, the streams have a starting point, but the arrival time of the next record is unpredictable. Unbounded streams must be continuously processed, i.e., incoming records must be promptly handled after ingestion, and the joined output will be generated incrementally.

To implement streaming joins, there are two technical challenges that need to be addressed:

  • Triggering mechanism for join and output

Suppose the above asof join case processes streaming data. When the first record (with a "Time" value of 10:10:03:010) of stream t1 arrives, the join engine must decide whether to join the record with a record (with a "Time" value of 10:10:00.000) in stream t2 and output the result immediately. Alternatively, it can wait until a triggering condition is satisfied.

  • Memory management

To accurately join streams, the join engine needs to cache historical data. Since the input streams are continuously ingested, a garbage collection mechanism is needed.

Streaming Join Engines

DolphinDB offers five types of streaming join engines: as of engine, window join engine, equi join engine, left semi join engine, and lookup join engine. Join engines adhere to join rules consistent with batch joins. Differences are explained in subsequent sections.

The streaming join engine functions as a module that enables real-time joins on streams. It can be conceptualized as a computational black box that applies join rules to two input streams, producing a joined output stream. The engine internally maintains the computing states.

The following script implements asof join over streams (based on the code example in section "Batch Joins").

First, we create two stream tables as inputs and one stream table as output. Then, we use the createAsofJoinEngine function to create the asof join engine, and the subscribeTable function to subscribe to the two streams. As data is continuously ingested into the join engine, the number of records in the output table will increase accordingly.
// create table
share streamTable(1:0, `Sym`Time`Price, [SYMBOL, TIME, DOUBLE]) as trade
share streamTable(1:0, `Sym`Time`BidPrice, [SYMBOL, TIME, DOUBLE]) as quotes
share table(1:0, `Time`Sym`Price`t2_Time`BidPrice, [TIME, SYMBOL, DOUBLE, TIME, DOUBLE]) as output

// create engine
ajEngine = createAsofJoinEngine(name="asofJoin", leftTable=trade, rightTable=quotes, outputTable=output, metrics=<[Price, quotes.Time, BidPrice]>, matchingColumn=`Sym, timeColumn=`Time, useSystemTime=false, delayedTime=1000)

// subscribe topic
subscribeTable(tableName="trade", actionName="joinLeft", offset=0, handler=getLeftStream(ajEngine), msgAsTable=true)
subscribeTable(tableName="quotes", actionName="joinRight", offset=0, handler=getRightStream(ajEngine), msgAsTable=true)
We use the following code to simulate the ingestion of streams and the join result is generated to the table "output".
// generate data
t1 = table(take(`A, 4) as Sym, 10:10:03.000+(10 2100 2890 6030) as Time, 12.5 12.5 12.6 12.6 as Price)
t2 = table(take(`A, 4) as Sym, 10:10:00.000+(0 3000 6000 9000) as Time, 12.6 12.5 12.7 12.6 as BidPrice)
// input data
quotes.append!(t2)
trade.append!(t1)

To address the challenges of real-time streaming joins discussed in Section "Streaming Joins", the streaming join engines adopt built-in computational rules and parameter interfaces to enhance the trigger mechanism. Memory management is handled through the parameter garbageSize which cleans up unneeded historical data.

The join mechanisms can be classified as follows:

(1) For joins that involve the time order of the data, a join engine processes the inputs based on:

  • The system time of data arrivals;
  • OR the time columns of the inputs. In such case, the timestamps of subsequent records are unpredictable, and by default the engine processes ingested data as if it is ordered. Additional timeout triggers can be set to force the joins.

(2) For joins that ignore the time order of the data, a join engine:

  • Calculates and outputs immediately when a record is ingested;
  • OR waits until a record is matched. Additional timeout triggers can be set to force the joins.

The join and trigger rules determine the calculation results of the engine. We will explain the behavior of each join engine and elaborate the application scenarios separately in corresponding page.

Comparison of Join Engines

Engine Join Behavior Join Column Similar SQL Join Number of Result Records Application
asof join engine Joins each record from the left stream with the matching record having the closest (or equal) timestamp from the right table. matchingColumn asof join less than or equal to the number of left records Calculating Transaction Costs
window join engine Joins each record from the left stream with the aggregated value of the matching records in the specified window over the right table. matchingColumn window join less than or equal to the number of left records Joining 3-min OHLC with Trades Data
equi join engine Joins each record from the left or right stream with the latest matching record from the other table. matchingColumn +timeColumn equi join equal to the number of matched records from the two tables (provided that the values of join columns are unique) Combining Metrics in Data Sources Captured at One-Minute Interval
look up join engine Joins each record from the left stream with the latest matching record from the right table. matchingColumn left join equal to the number of left records Joining Real-Time Market Data with Historical Daily Metrics
left semi join engine Joins each record from the left stream with the first or last matching record from the right table. matchingColumn equi join less than or equal to the number of left records Joining Trades With Orders Data Correlating Individual Stocks to Index
snapshot join engine Joins each record from both tables with either the latest matching record or with all matching records from the other table. matchingColumn inner join or full outer join less than or equal to all records from two tables Joining account holdings with market prices to assess risk metrics such as leverage and net worth.

Conclusion

In conclusion, DolphinDB offers built-in engines to enable efficient streaming joins with real-time triggering mechanism and memory management. Developers can easily implement complex real-time streaming joins through easy-to-configure parameters. Featuring streaming engines, stream pipeline processing, and parallel computing, the DolphinDB streaming framework can be applied to real-time business scenarios, providing timely insights and enabling faster decision-making.