DStream::leftSemiJoinEngine
Syntax
DStream::leftSemiJoinEngine(rightStream, metrics, matchingColumn,
[garbageSize=5000], [updateRightTable=false])
Details
Creates a left semi join streaming engine. For details, see createLeftSemiJoinEngine .
Return value: A DStream object.
Arguments
rightStream is a DStream object indicating the input data source of the right table.
-
metrics can use one or more expressions, built-in or user-defined functions (but not aggregate functions), or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form.
-
metrics can be functions that return multiple values and the columns in the output table to hold the return values must be specified. For example, <func(price) as `col1`col2>.
- When there is only 1 column to match - If the names of the columns to match are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it's a tuple of two elements. For example, if the column is named "sym" in the left table and "sym1" in the right table, then matchingColumn = [[`sym],[`sym1]].
-
When there are multiple columns to match - If both tables share the names of all columns to match, matchingColumn is a STRING vector; otherwise it's a tuple of two elements. For example, if the columns are named "timestamp" and "sym" in the left table, whereas in the right table they're named "timestamp" and "sym1", then matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]].
garbageSize (optional) is a positive integer. The default value is 5,000. Unlike other join engines, the garbageSize parameter for left semi join engine is only used to remove the historical data from the left table. The system will clear the data from the left table when the number of joined records exceeds garbageSize.
updateRightTable (optional) is a Boolean value indicating whether to output the first record (updateRightTable = true) or the latest record (updateRightTable = false) when there are more than one matching records in the right table. The default value is false.
Examples
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('joinEngine')
g = createStreamGraph('joinEngine')
r = g.source("right", 1024:0, `time`sym1`vol, [TIMESTAMP, SYMBOL, INT])
g.source("left", 1024:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE])
.leftSemiJoinEngine(r, metrics=<[price, vol,price*vol]>, matchingColumn=[[`time,`sym], [`time,`sym1]], updateRightTable=true)
.sink("output")
g.submit()
go
v = [1, 5, 10, 15]
tmp1=table(2012.01.01T00:00:00.000+v as time, take(`AAPL, 4) as sym, rand(100,4) as price)
appendOrcaStreamTable("left", tmp1)
v = [1, 1, 3, 4, 5, 5, 5, 15]
tmp2=table(2012.01.01T00:00:00.000+v as time, take(`AAPL, 8) as sym, rand(100,8) as vol)
appendOrcaStreamTable("right", tmp2)
select * from orca_table.output
time | sym | price | vol | price_mul |
---|---|---|---|---|
2012.01.01 00:00:00.001 | AAPL | 36 | 62 | 2,232 |
2012.01.01 00:00:00.005 | AAPL | 82 | 35 | 2,870 |
2012.01.01 00:00:00.015 | AAPL | 60 | 23 | 1,380 |