DStream::equalJoinEngine

Syntax

DStream::equalJoinEngine(rightStream, metrics, matchingColumn, timeColumn, [garbageSize=5000], [maxDelayedTime])

Details

Creates an equi join streaming engine. For details, see createEquiJoinEngine.

Return value: A DStream object.

Arguments

rightStream is a DStream object indicating the input data source of the right table.

metrics is metacode (can be a tuple) specifying the calculation formulas. For more information about metacode, refer to Metaprogramming.
  • metrics can use one or more expressions, built-in or user-defined functions (but not aggregate functions), or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form.

  • metrics can be functions with multiple returns and the columns in the output table to hold the return values must be specified. For example, <func(price) as `col1`col2>.

  • To specify a column that exists in both the left and the right tables, use the format tableName.colName.

  • The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.

matchingColumn is a STRING scaler/vector/tuple indicating the column(s) on which the tables are joined. It supports integral, temporal or literal (except UUID) types.
  • When there is only 1 column to match - If the names of the columns to match are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it's a tuple of two elements. For example, if the column is named "sym" in the left table and "sym1" in the right table, then matchingColumn = [[`sym],[`sym1]].
  • When there are multiple columns to match - If both tables share the names of all columns to match, matchingColumn is a STRING vector; otherwise it's a tuple of two elements. For example, if the columns are named "orderNo" and "sym" in the left table, whereas in the right table they're named "orderNo" and "sym1", then matchingColumn = [[`orderNo, `sym], [`orderNo,`sym1]].

timeColumn is a STRING scalar or vector indicating the time columns in the left table and the right table. The time columns in the left and right tables must have the same data type. When the two time columns have the same column name, timeColumn is a string scalar; otherwise, timeColumn is vector of two strings.

garbageSize (optional) is a positive integer with the default value of 5,000 (in unit of rows). When the number of rows of historical data in memory exceeds the garbageSize, the system will remove the historical data that is not needed for the current calculation on the following conditions:
  • The historical data has already been joined and returned.

  • For historical data that has not been joined, if the timestamp difference between the historical data and the new arriving data in left/right table has exceeded the maxDelayedTime, it will also be discarded.

maxDelayedTime (optional) is a positive integer with the default value of 3 (seconds), indicating the maximum time to keep cached data in the engine. This parameter only takes effect when the conditions described in garbageSize are met. It is not recommended to set the maxDelayedTime too small in case data got removed before it is joined.

Examples

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('joinEngine')
g = createStreamGraph('joinEngine')
r = g.source("right", 1024:0, `time`sym`val, [SECOND, SYMBOL, DOUBLE])
g.source("left", 1024:0, `time`sym`price, [SECOND, SYMBOL, DOUBLE])
    .equalJoinEngine(r, [<price>, <val>, <price*val>], `sym, `time)
    .sink("output")
g.submit()

go

tmp1=table(13:30:10+1..20 as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(1..20) as price)
appendOrcaStreamTable("left", tmp1)

tmp2=table(13:30:10+1..20 as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(50..31) as val)
appendOrcaStreamTable("right", tmp2)

select * from orca_table.output