DStream::udfEngine
Syntax
DStream::udfEngine(func)
Details
DStream::udfEngine is an extension engine in Orca stream graphs used
to execute user-defined processing logic. When the built-in streaming engine cannot
meet specific business requirements, users can define functions via
DStream::udfEngine to process each stream record.
To enable stateful computation, UDF functions can access shared variables declared
via DStream::sharedTable, DStream::sharedDict, or
DStream::sharedKeyedTable. These shared variables can be used
across multiple udfEngine instances and tasks within the same
stream graph. Orca automatically persists their state using the Checkpoint mechanism
and restores them during failure recovery.
Note: UDF functions are not allowed to access any external variables. Only local
variables or shared variables referenced via orcaObj("name") are
permitted.
Constraints on Shared Variable
-
Shared variables must be declared in advance using
DStream::sharedTable,DStream::sharedDict, orDStream::sharedKeyedTable. Within a UDF function, they can be referenced viaorcaObj("name"). Note thatorcaObjis only valid in the execution context of the UDF. -
Each shared variable follows a single-writer, multiple-reader pattern: only one
udfEngineinstance can write to it, while multiple instances may read it concurrently. -
All tasks that access the same shared variable are scheduled by Orca on the same physical node to ensure local access and state consistency.
Parameters
func A user-defined function. It accepts only one parameter, which is a stream
table. This parameter must not be modified in place or treated as a mutable object.
Within the function, shared variables defined by
DStream::sharedTable, DStream::sharedDict, or
DStream::sharedKeyedTable can be read and written for state
updates. If the function returns a value, it must be either a dictionary or a
table.
Returns
A DStream object.
Examples
In this example, we use DStream::sharedKeyedTable and
DStream::udfEngine to implement a historical delta computation.
The DStream::sharedKeyedTable is used to maintain the most recent
record for each ID. When a new record arrives, if an entry with the same id already
exists in the table, the UDF outputs the difference between the new value and the
previously stored (historical) value. If the ID does not exist, the new record is
inserted into the table without producing any output.
if(existsCatalog("orcaCatalog")) dropCatalog("orcaCatalog")
createCatalog("orcaCatalog")
go
use catalog orcaCatalog
// Create stream graph
g = createStreamGraph("compare")
g.sharedKeyedTable("history", "id", 1:0, `id`value, [INT, DOUBLE])
g.source("data", `id`value`time, [INT, DOUBLE, TIMESTAMP])
.udfEngine(def(msg) {
history = orcaObj("history")
diffTable = table(100:0, `id`diff, [INT, DOUBLE])
for(i in 0:msg.size()) {
idVal = msg.id[i]
valueVal = msg.value[i]
// Read historical value
old = select value from history where id = idVal
// Write new value
newRow = table(idVal as id, valueVal as value)
history.append!(newRow)
// Compute delta
if(old.size() > 0) {
diffTable.append!(table(idVal as id, (valueVal - old.value[0]) as diff))
}
}
return diffTable
})
.sink("comparison")
g.submit()
// Generate mock data
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// Insert data
appendOrcaStreamTable("orcaCatalog.orca_table.data", mockData)
// Generate data with duplicate IDs
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// Insert data
appendOrcaStreamTable("orcaCatalog.orca_table.data", mockData)
// Wait for processing and inspect results
sleep(1000)
select * from orcaCatalog.orca_table.comparison
| id | diff |
|---|---|
| 1 | 35.55946895749296 |
| 2 | -3.4362593906550387 |
| 3 | 36.283468999034596 |
| 4 | 68.97968558337999 |
| 5 | -91.64246928217878 |
Related functions: DStream::map, getUdfEngineVariable
