DStream::sharedDict

Syntax

DStream::sharedDict(name, keyObj, valueObj, [ordered=false])

or

DStream::sharedDict(name, keyType, valueType, [ordered=false])

Details

Creates a shared dictionary in Orca, which is only usable within DStream::udfEngine. For details about dictionaries, refer to dict.

Parameters

name is a STRING scalar indicating the shared dictionary name.

For the first usage:

keyObj is a vector indicating dictionary keys.

valueObj is a vector indicating dictionary values.

For the second usage:

keyType is the data type of dictionary keys. The following data categories are supported: Integral (excluding COMPRESSED), Temporal, Floating and Literal.

valueType is the data type of dictionary values. Note that COMPLEX/POINT/DECIMAL is not supported.

ordered (optional) is a Boolean value. The default value is false, which indicates to create a regular dictionary. True means to create an ordered dictionary. The regular dictionaries do not track the insertion order of the key-value pairs whereas the ordered dictionaries preserve the insertion order of key-value pairs.

Returns

A dictionary.

Examples

In this example, we use DStream::sharedDict and DStream::udfEngine to implement a count-based conditional alerting mechanism. The DStream::sharedDict is used to track the occurrence count of each key, and when the count for a key exceeds a predefined threshold, an alert message is sent downstream.

if(existsCatalog("orcaCatalog")) dropCatalog("orcaCatalog")
createCatalog("orcaCatalog")
go
use catalog orcaCatalog
// Create stream graph
g = createStreamGraph("counter")
g.sharedDict("counts", STRING, LONG)
g.source("items", ["key"], [STRING])
  .udfEngine(def(msg) {
    counts = orcaObj("counts")
    triggered = table(100:0, `key`count, [STRING, LONG])
    for(i in 0:msg.size()) {
        keyVal = msg.key[i]
        // Read current count
        current = 0
        if (keyVal in counts) {
            current = counts[keyVal]
        }
        newCount = current + 1
        // Update count
        counts[keyVal] = newCount
        // Check if threshold is triggered
        if(newCount >= 3) {
            triggered.append!(table(keyVal as key, newCount as count))
        }
    }
    return triggered
  })
  .sink("alerts")
g.submit()
go
// Generate mock data
keys = ["A", "B", "A", "C", "B", "A", "B", "B", "C", "C"]
mockData1 = table(take(keys[0:3], 3) as key)
mockData2 = table(take(keys[3:6], 3) as key)
mockData3 = table(take(keys[6:10], 4) as key)
// Insert data
appendOrcaStreamTable("orcaCatalog.orca_table.items", mockData1)
appendOrcaStreamTable("orcaCatalog.orca_table.items", mockData2) 
appendOrcaStreamTable("orcaCatalog.orca_table.items", mockData3)
// Wait for processing and inspect results
sleep(1000)
select * from orcaCatalog.orca_table.alerts;
key count
A 3
B 3
B 4
C 3