DStream::asofJoinEngine

Syntax

DStream::asofJoinEngine(rightStream, metrics, matchingColumn, [timeColumn], [useSystemTime=false], [delayedTime], [garbageSize], [sortByTime])

Details

Creates an asof join streaming engine. For details, see createAsofJoinEngine.

Return value: A DStream object.

Arguments

rightStream is a DStream object indicating the input data source of the right table.

metrics is metacode (can be a tuple) specifying the calculation formulas. For more information about metacode, refer to Metaprogramming.
  • metrics can use one or more expressions, built-in or user-defined functions (but not aggregate functions), or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form.

  • metrics can be functions with multiple returns and the columns in the output table to hold the return values must be specified. For example, <func(price) as `col1`col2>.

  • To specify a column that exists in both the left and the right tables, use the format tableName.colName.

  • The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.

matchingColumn is a STRING scaler/vector/tuple indicating the column(s) on which the tables are joined. It supports integral, temporal or literal (except UUID) types.
  • When there is only 1 column to match - If the names of the columns to match are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it's a tuple of two elements. For example, if the column is named "sym" in the left table and "sym1" in the right table, then matchingColumn = [[`sym],[`sym1]].
  • When there are multiple columns to match - If both tables share the names of all columns to match, matchingColumn is a STRING vector; otherwise it's a tuple of two elements. For example, if the columns are named "orderNo" and "sym" in the left table, whereas in the right table they're named "orderNo" and "sym1", then matchingColumn = [[`orderNo, `sym], [`orderNo,`sym1]].

timeColumn (optional) specifies the name of the time column in the left table and the right table. The time columns must have the same data type. When useSystemTime = false, it must be specified. If the names of the time column in the left table and the right table are the same, timeColumn is a string. Otherwise, it is a vector of 2 strings indicating the time column in each table.

useSystemTime (optional) indicates whether the left table and the right table are joined on the system time, instead of on timeColumn.
  • useSystemTime = true: join records based on the system time (timestamp with millisecond precision) when they are ingested into the engine.
  • useSystemTime = false (default): join records based on the specified *timeColumn* from the left table and the right table.

delayedTime (optional) is a positive integer with the same precision as timeColumn, indicating the maximum time to wait before the engine joins an uncalculated record in the left table with a right table record. To specify delayedTime, timeColumn must be specified. For more information, see Details.

garbageSize (optional) is a positive integer with the default value of 5,000 (rows). As the subscribed data is ingested into the engine, it continues to take up the memory. Within the left/right table, the records are grouped by matchingColumn values; When the number of records in a group exceeds garbageSize, the system will remove those already been calculated from memory.

sortByTime (optional) is a Boolean value that indicates whether the output data is globally sorted by time. The default value is false, meaning the output data is sorted only within groups. Note that if sortByTime is set to true, the parameter delayedTime cannot be specified, and the data input to the left and right tables must be globally sorted.

Examples

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('asofJoin')

g = createStreamGraph('asofJoin')
r = g.source("right", 1024:0, `time`sym`bid`ask, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE])
g.source("left", 1024:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE])
    .asofJoinEngine(r, <[price, bid, ask, abs(price-(bid+ask)/2)]>, `sym, `time)
    .sink("output")
g.submit()
go

tmp1=table(2024.08.27T09:30:00.000+2 8 20 22 23 24 as time, take(`A`B, 6) as sym, 20.01 20.04 20.07 20.08 20.4 20.5 as price)
tmp1.sortBy!(`time)
appendOrcaStreamTable("left", tmp1)

tmp2=table(2024.08.27T09:30:00.000+1 5 6 11 19 20 21 as time, take(`A`B, 7) as sym, 20 20.02 20.03 20.05 20.06 20.6 20.4 as bid,  20.01 20.03 20.04 20.06 20.07 20.5 20.6 as ask)
appendOrcaStreamTable("right", tmp2)

select * from orca_table.output
time sym price bid ask abs
2024.08.27 09:30:00.002 A 20.01 20.00 20.01 0.004999999999999005
2024.08.27 09:30:00.020 A 20.07 20.06 20.07 0.005000000000002558
2024.08.27 09:30:00.008 B 20.04 20.02 20.03 0.015000000000000568