DStream::sharedTable

Syntax

DStream::sharedTable(name, X, [X1], [X2], .....)

or

DStream::sharedTable(name, capacity:size, colNames, colTypes)

Details

Creates a shared table in Orca, which is only usable within DStream::udfEngine. For details about table s, refer to table.

Parameters

name is a STRING scalar indicating the shared table name.

For the first scenario: X, X1, X2 ... can be vectors, matrices or tuples. Each vector, each matrix column and each tuple element must have the same length. When Xk is a tuple:
  • If the elements of Xk are vectors of equal length, each element of the tuple will be treated as a column in the table.
  • If Xk contains elements of different types or unequal lengths, it will be treated as a single column in the table (with the column type set to ANY), and each element will correspond to the value of that column in each row.
For the second scenario:
  • capacity is a positive integer indicating the amount of memory (in terms of the number of rows) allocated to the table. When the number of rows exceeds capacity, the system will first allocate memory of 1.2~2 times of capacity, copy the data to the new memory space, and release the original memory. For large tables, these steps may use significant amount of memory.

  • size is an integer no less than 0 indicating the initial size (in terms of the number of rows) of the table. If size=0, create an empty table; If size>0, the initialized values are:

    • false for Boolean type;

    • 0 for numeric, temporal, IPADDR, COMPLEX, and POINT types;

    • Null value for Literal, INT128 types.

    Note:
    If colTypes is an array vector, size must be 0.
  • colNames is a STRING vector of column names.

  • colTypes is a STRING vector of data types. It can use either the reserved words for data types or corresponding strings.

Returns

A table.

Examples

In this example, we useDStream::sharedTable and DStream::udfEngine to implement real-time computation and output of the current average. The DStream::sharedTable stores all previously computed average values. Each time new data is processed, the UDF recalculates the average based on the latest data, appends the result to the DStream::sharedTable, and outputs only the most recent row—i.e., the latest computed average—to the downstream.

if(existsCatalog("orcaCatalog")) dropCatalog("orcaCatalog")
createCatalog("orcaCatalog")
go
use catalog orcaCatalog

// Create stream graph
g = createStreamGraph("avgCalc")
g.sharedTable("stats", 1:0, `sum`count, [DOUBLE, LONG])

g.source("numbers", ["value"], [DOUBLE])
  .udfEngine(def(msg) {
    stats = orcaObj("stats")
    
    // Read historical value
    if(stats.size() > 0) {
        lastSum = exec last(sum) from stats
        lastCount = exec last(count) from stats
    } else {
        lastSum = 0.0
        lastCount = 0
    }
    
    // Compute new value
    newSum = lastSum + sum(msg.value)
    newCount = lastCount + msg.size()
    if (newCount > 0) {
        avgValue = newSum / newCount
    } else {
        avgValue = 0.0
    }
    
    // Write new value
    newRow = table(newSum as sum, newCount as count)
    stats.append!(newRow)
    
    // Return the result
    return table(newSum as total, newCount as count, avgValue as avg)
  })
  .sink("output")

g.submit()

// Generate mock data
mockData1 = table(rand(10.0, 3) as value)
mockData2 = table(rand(10.0, 5) as value)
mockData3 = table(rand(10.0, 2) as value)

// Insert data, wait for processing and inspect results
appendOrcaStreamTable("orcaCatalog.orca_table.numbers", mockData1)
select * from orcaCatalog.orca_table.numbers;
select * from orcaCatalog.orca_table.output;
appendOrcaStreamTable("orcaCatalog.orca_table.numbers", mockData2)
select * from orcaCatalog.orca_table.numbers;
select * from orcaCatalog.orca_table.output;
appendOrcaStreamTable("orcaCatalog.orca_table.numbers", mockData3)
select * from orcaCatalog.orca_table.numbers;
select * from orcaCatalog.orca_table.output;