DStream::sharedKeyedTable

Syntax

DStream::sharedKeyedTable(name, keyColumns, X, [X1], [X2], .....)

or

DStream::sharedKeyedTable(name, keyColumns, capacity:size, colNames, colTypes)

or

DStream::sharedKeyedTable(name, keyColumns, table)

Details

Creates a shared keyed table in Orca, which is only usable within DStream::udfEngine. For details about keyed tables, refer to keyedTable.

Parameters

name is a STRING scalar indicating the shared keyed table name.

keyColumn is a string scalar or vector indicating the name(s) of the primary key column(s). The column type must be INTEGRAL, TEMPORAL or LITERAL.

For the first scenario: X, X1, X2 ... can be vectors, matrices or tuples. Each vector, each matrix column and each tuple element must have the same length. When Xk is a tuple:
  • If the elements of Xk are vectors of equal length, each element of the tuple will be treated as a column in the table.
  • If Xk contains elements of different types or unequal lengths, it will be treated as a single column in the table (with the column type set to ANY), and each element will correspond to the value of that column in each row.
For the second scenario:
  • capacity is a positive integer indicating the amount of memory (in terms of the number of rows) allocated to the table. When the number of rows exceeds capacity, the system will first allocate memory of 1.2~2 times of capacity, copy the data to the new memory space, and release the original memory. For large tables, these steps may use significant amount of memory.
  • size is an integer no less than 0 indicating the initial size (in terms of the number of rows) of the table. If size=0, create an empty table; If size>0, the initialized values are:
    • false for Boolean type;
    • 0 for numeric, temporal, IPADDR, COMPLEX, and POINT types;
    • Null value for Literal, INT128 types.
  • Note:
    If colTypes is an array vector, size must be 0.
  • colNames is a STRING vector of column names.
  • colTypes is a string vector of data types. The non-key columns can be specified as an array vector type or ANY type.

For the third scenario, table is a table. Please note that keyColumns in table cannot have duplicate values.

Returns

A keyed table.

Examples

In this example, we use DStream::sharedKeyedTable and DStream::udfEngine to implement a historical delta computation.

The DStream::sharedKeyedTable is used to maintain the most recent record for each ID. When a new record arrives, if an entry with the same id already exists in the table, the UDF outputs the difference between the new value and the previously stored (historical) value. If the ID does not exist, the new record is inserted into the table without producing any output.

if(existsCatalog("orcaCatalog")) dropCatalog("orcaCatalog")
createCatalog("orcaCatalog")
go
use catalog orcaCatalog
// Create stream graph
g = createStreamGraph("compare")
g.sharedKeyedTable("history", "id", 1:0, `id`value, [INT, DOUBLE])
g.source("data", `id`value`time, [INT, DOUBLE, TIMESTAMP])
  .udfEngine(def(msg) {
    history = orcaObj("history")
    diffTable = table(100:0, `id`diff, [INT, DOUBLE])
    for(i in 0:msg.size()) {
        idVal = msg.id[i]
        valueVal = msg.value[i]
        // Read historical value
        old = select value from history where id = idVal
        // Write new value
        newRow = table(idVal as id, valueVal as value)
        history.append!(newRow)
        // Compute delta
        if(old.size() > 0) {
            diffTable.append!(table(idVal as id, (valueVal - old.value[0]) as diff))
        }
    }
    return diffTable
  })
  .sink("comparison")
g.submit()
// Generate mock data
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// Insert data
appendOrcaStreamTable("orcaCatalog.orca_table.data", mockData)
//  Generate data with duplicate IDs
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// Insert data
appendOrcaStreamTable("orcaCatalog.orca_table.data", mockData)
// Wait for processing and inspect results
sleep(1000)
select * from orcaCatalog.orca_table.comparison
id diff
1 35.55946895749296
2 -3.4362593906550387
3 36.283468999034596
4 68.97968558337999
5 -91.64246928217878