Left Semi Join Engine

DolphinDB provides multiple lightweight and easy-to-use join engines. This page will introduce window join engine.

Introduction

Similar to the SQL equi join, the left semi join engine pairs records from the left and right tables based on matching column values. For each record from the left table, the engine caches it until a record with the same matching column value is found in the right table. The join is triggered only when a match is identified.

Unlike the SQL equi join, the engine only caches the first or last record with the same matching column value in the right table. As a result, each left record is joined at most once.

The following figure depicts how the left semi join engine (which retains the latest record for each join column value from the right table) processes ingested streams. Each stream contains two columns: a join column and a metrics column.

For application scenarios of the engine, see Section "Left Semi Join Engine: Joining Trades With Orders Data" and "Left Semi Join Engine: Correlating Individual Stocks to Index".

createLeftSemiJoinEngine Syntax
createLeftSemiJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [garbageSize=5000], [updateRightTable=false])

Details about the parameters, see: createLeftSemiJoinEngine.

Use Case 1: Joining Trades With Orders Data

In this use case, we will enrich trading information by joining tick trades with buy and sell orders based on the order IDs. The trade record is only output after its corresponding orders are found.

We create a streaming cascade with two left semi join engines to associate trades with buy and sell orders. We create four stream tables for trades, orders, output intermediate results, and the final output.

// create table
share streamTable(1:0, `Sym`BuyNo`SellNo`TradePrice`TradeQty`TradeTime, [SYMBOL, LONG, LONG, DOUBLE, LONG, TIME]) as trades
share streamTable(1:0, `Sym`OrderNo`Side`OrderQty`OrderPrice`OrderTime, [SYMBOL, LONG, INT, LONG, DOUBLE, TIME]) as orders
share streamTable(1:0, `Sym`SellNo`BuyNo`TradePrice`TradeQty`TradeTime`BuyOrderQty`BuyOrderPrice`BuyOrderTime, [SYMBOL, LONG, LONG, DOUBLE, LONG, TIME, LONG, DOUBLE, TIME]) as outputTemp
share streamTable(1:0, `Sym`BuyNo`SellNo`TradePrice`TradeQty`TradeTime`BuyOrderQty`BuyOrderPrice`BuyOrderTime`SellOrderQty`SellOrderPrice`SellOrderTime, [SYMBOL, LONG, LONG, DOUBLE, LONG, TIME, LONG, DOUBLE, TIME, LONG, DOUBLE, TIME]) as output
// create engine: left join buy order
ljEngineBuy=createLeftSemiJoinEngine(name="leftJoinBuy", leftTable=outputTemp, rightTable=orders, outputTable=output,  metrics=<[SellNo, TradePrice, TradeQty, TradeTime, BuyOrderQty, BuyOrderPrice, BuyOrderTime, OrderQty, OrderPrice, OrderTime]>, matchingColumn=[`Sym`BuyNo, `Sym`OrderNo])
//  create engine: left join sell order 
ljEngineSell=createLeftSemiJoinEngine(name="leftJoinSell", leftTable=trades, rightTable=orders, outputTable=getLeftStream(ljEngineBuy),  metrics=<[BuyNo, TradePrice, TradeQty, TradeTime, OrderQty, OrderPrice, OrderTime]>, matchingColumn=[`Sym`SellNo, `Sym`OrderNo])
// subscribe topic
subscribeTable(tableName="trades", actionName="appendLeftStream", handler=getLeftStream(ljEngineSell), msgAsTable=true, offset=-1)
subscribeTable(tableName="orders", actionName="appendRightStreamForSell", handler=getRightStream(ljEngineSell), msgAsTable=true, offset=-1)
subscribeTable(tableName="orders", actionName="appendRightStreamForBuy", handler=getRightStream(ljEngineBuy), msgAsTable=true, offset=-1) 

In this script, the engine "leftJoinSell" joins "trades" and "orders" based on the sell order IDs. The output of "leftJoinSell" is then ingested as the left stream into the engine "leftJoinBuy" to be joined with the right stream of "orders" based on buy order IDs.

To manage the memory usage of engines, we use the default value of the parameter garbageSize. Unlike other engines, the left semi join engine only clears the unneeded historical data from the left table and maintains all records from the right table in the cache. Thus, each engine created above is at least the size of the "orders" table.

The following script appends "orders" to the right stream and "trades" to the left stream of the engine "leftJoinSell":

// generate data: trade
t1 = table(`A`B`B`A as Sym, [2, 5, 5, 6] as BuyNo, [4, 1, 3, 4] as SellNo, [7.6, 3.5, 3.5, 7.6]as TradePrice, [10, 100, 20, 50]as TradeQty, 10:00:00.000+(400 500 500 600) as TradeTime)
// generate data: order
t2 = table(`B`A`B`A`B`A as Sym, 1..6 as OrderNo, [2, 1, 2, 2, 1, 1] as Side, [100, 10, 20, 100, 350, 50] as OrderQty, [7.6, 3.5, 7.6, 3.5, 7.6, 3.5] as OrderPrice, 10:00:00.000+(1..6)*100 as OrderTime)
// input data
orders.append!(t2)
trades.append!(t1)

The correspondence of records between the input streams is shown below:

The output table shows that each trade record is joined with its corresponding buy and sell order records from the "orders" stream. It now displays information on the buy and sell quantity, price, and time for each trade.

Use Case 2: Correlating Individual Stocks to Index

This use case correlates individual stocks to a stock index. The stock and index data are downsampled to 1-minute interval. All stocks are associated with one index, and a join result is output for each input stock record.

To achieve this, we cascade a left semi join engine with a reactive state engine.

// create table
share streamTable(1:0, `Sym`Time`Close, [SYMBOL, TIME, DOUBLE]) as stockKline
share streamTable(1:0, `Sym`Time`Close, [SYMBOL, TIME, DOUBLE]) as indexKline
share streamTable(1:0, `Time`Sym`Close`Index1Close, [TIME, SYMBOL, DOUBLE, DOUBLE]) as stockKlineAddIndex1
share streamTable(1:0, `Sym`Time`Close`Index1Close`Index1Corr, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE]) as output
//  create engine: calculate correlation
rsEngine = createReactiveStateEngine(name="calCorr", dummyTable=stockKlineAddIndex1, outputTable=output, metrics=[<Time>, <Close>, <Index1Close>, <mcorr(ratios(Close)-1, ratios(Index1Close)-1, 3)>], keyColumn="Sym")
//  create engine: left join Index1
ljEngine1 = createLeftSemiJoinEngine(name="leftJoinIndex1", leftTable=stockKline, rightTable=indexKline, outputTable=getStreamEngine("calCorr"), metrics=<[Sym, Close, indexKline.Close]>, matchingColumn=`Time)
// subscribe topic
def appendIndex(engineName, indexName, msg){
	tmp = select * from msg where Sym = indexName
	getRightStream(getStreamEngine(engineName)).append!(tmp)
}
subscribeTable(tableName="indexKline", actionName="appendIndex1", handler=appendIndex{"leftJoinIndex1", "idx1"}, msgAsTable=true, offset=-1, hash=1)
subscribeTable(tableName="stockKline", actionName="appendStock", handler=getLeftStream(ljEngine1), msgAsTable=true, offset=-1, hash=0)

In this case, the user-defined function appendIndex is specified as the handler of the subscription to "indexKline", which selects index "idx1" and publishes the filtered stream to the join engine. The "leftJoinIndex1" engine joins stock data "stockKline" with filtered index data. The output of "leftJoinIndex1" is then ingested into the reactive state engine "calCorr" which uses the built-in state functions mcorr and ratios to calculate stock correlation.

The following script appends "indexKline" to the right stream and "stockKline" to the left stream:

// generate data: stock Kline
t1 = table(`A`B`A`B`A`B`A`B`A`B as Sym, 10:00:00.000+(0 0 1 1 2 2 3 3 4 4)*60000 as Time, (4.1 7.6 3.8 7.6 4.3 7.5 3.5 7.6 4.2 7.6) as Close)
// generate data: index Kline
t2 = table(`idx1`idx2`idx1`idx2`idx1`idx2`idx1`idx2`idx1`idx2 as Sym, 10:00:00.000+(0 0 1 1 2 2 3 3 4 4)*60000 as Time, (2.1 5 2.2 5 1.9 5 1.7 5 1.7 5) as Close)
// input data
indexKline.append!(t2)
stockKline.append!(t1)

The correspondence of records between the input streams is shown below:

As shown in the output table, both the stocks A and B are correlated to index idx1. The result of the first two minutes is empty because the window size of mcorr is 3.