DStream::lookupJoinEngine
Syntax
DStream::lookupJoinEngine(rightStream, metrics, matchingColumn,
[rightTimeColumn], [checkTimes], [keepDuplicates=false])
Details
Creates a lookup join streaming engine. For details, see createLookupJoinEngine.
Return value: A DStream object.
Arguments
rightStream is a DStream object indicating the input data source of the right table.
-
metrics can use one or more expressions, built-in or user-defined functions (but not aggregate functions), or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form.
-
metrics can be functions that return multiple values and the columns in the output table to hold the return values must be specified. For example, <func(price) as
col1
col2>.
-
When there is only 1 column to match - If the names of the columns to match are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it's a tuple of two elements. For example, if the column is named "sym" in the left table and "sym1" in the right table, then matchingColumn = [[`sym],[`sym1]].
-
When there are multiple columns to match - If both tables share the names of all columns to match, matchingColumn is a STRING vector; otherwise it's a tuple of two elements. For example, if the columns are named "timestamp" and "sym" in the left table, whereas in the right table they're named "timestamp" and "sym1", then matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]].
rightTimeColumn (optional) is a STRING scalar indicating the time column in the right table. If the parameter is specified, the right table will keep the record with the latest timestamp. If there are multiple records with identical timestamps, only the latest record is retained. If the parameter is not specified, the latest ingested record (based) on the system time will be kept.
-
If checkTimes is a vector of temporal values, it must be of SECOND, TIME or NANOTIME type. The lookup join engine updates the right table according to the time specified by each element in the vector on a daily basis.
-
If checkTimes is a DURATION scalar, it indicates the interval to update the right table.
keepDuplicates (optional) is a Boolean value indicating whether to keep all records in each group of the right table. When set to false (default), the engine keeps the latest record in each group. When set to true, the engine keeps all records in each group - in this case, the engine performs inner join, i.e., output is only generated if a match is found.
Examples
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('joinEngine')
g = createStreamGraph('joinEngine')
r = g.source("right", 1024:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT])
g.source("left", 1024:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE])
.lookupJoinEngine(r, metrics=<[price,val,price*val]>, matchingColumn=`sym)
.sink("output")
g.submit()
go
n = 15
tmp1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
appendOrcaStreamTable("right", tmp1)
n = 10
tmp2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
appendOrcaStreamTable("left", tmp2)
select * from orca_table.output
sym | price | val | price_mul |
---|---|---|---|
A | 10.1 | 13 | 131.30 |
B | 11.1 | 14 | 155.40 |
C | 12.1 | 15 | 181.50 |
A | 13.1 | 13 | 170.30 |
B | 14.1 | 14 | 197.40 |
C | 15.1 | 15 | 226.50 |
A | 16.1 | 13 | 209.30 |
B | 17.1 | 14 | 239.40 |
C | 18.1 | 15 | 271.50 |
A | 19.1 | 13 | 248.30 |