Streaming Engines
In stream data processing, unbounded streams must be continuously processed with high efficiency. DolphinDB provides a number of stream computing engines where incremental computing is adopted for optimal performance. The results can be output to shared in-memory tables, stream tables, message-oriented middleware, databases and API clients for further processing.
Streaming Engines
Time-series Streaming Engines
DolphinDB provides 3 time-series streaming engines: time-series engine (createTimeSeriesEngine), daily time-series engine (createDailyTimeSeriesEngine) and session window engine (createSessionWindowEngine). The engines adopt time-based windows for calculation.
Both the time-series engine and the daily time-series engine can be used for sliding-window aggregations at specified frequencies, such as calculating OHLC bars.
The daily time-series engine extends the functionalities of the time-series engine by allowing users to specify trading sessions. For each session within a calendar day, the records before a session that have not participated in calculation can be included in the first window of the session for calculation.
In markets such as crypto market and forex market that operate without interruption, time-series engine fits the needs. However, in scenarios such as stock and futures market where there are well-defined trading sessions, the daily time-series engine is often more applicable.
Session Window Engine
Session windows can be considered as activity sessions (where data is generated). Before and after a session, there are gaps of inactivity (where no data are generated).
The session window engine has the same calculation rules and triggering patterns as the time-series engine. The difference is that the windows of the time-series engine are generated at fixed frequencies with a fixed size whereas the windows of the session window engine are not. The start time of the first session window is the timestamp of the first record ingested to the session window engine. If the session window does not receive another record for a certain period of time, it closes and the end time takes the timestamp of the last received record + the waiting time. After the current window ends, the next session window starts at the ingestion of a new record.
Taking the IoT scenario as an example: depending on whether a device is online or not, in certain time periods a large amount of data could be generated while in others there could be none. Applying sliding window calculation on such data may cause unnecessary computational overhead as a lot of empty windows will be generated. The session window engine is designed to solve such problems.
Reference: createSessionWindowEngine
Reactive State Engine
Stream computation in DolphinDB can use stateless or stateful factors. A stateless factor takes only the latest record for calculation, while a stateful factor requires the latest record as well as states (i.e., previous records and intermediate results). States are continuously updated with each record or event and maintained in the streaming engines for subsequent calculation.
Each record ingested into the reactive state streaming engine
(createReactiveStateEngine
) triggers an output, so the
numbers of inputs and outputs always stay the same. Only the vectorized
functions can be used as operators in the engine. The performance of stateful
operators (such as moving functions, cumulative functions, order-sensitive
functions, and top-N functions) is optimized for their application scenarios in
DolphinDB. Note that only these optimized state functions can be used in the
engine. Alternatively, you can implement a stateful indicator by defining a
user-defined function and declaring it with keyword @state
before the definition.
-
Finance: Calculating stateful factors with high-frequency trading data;
-
IoT: Detecting whether the real-time readings of temperature sensors are continuously increasing.
Reference: createReactiveStateEngine
Cross-Sectional Engine
The cross-sectional streaming engine (createCrossSectionalEngine
) is
used for real-time computing on cross-sectional data, which is a collection of
observations (behaviors) for multiple subjects (entities such as different stocks)
at a single point in time.
Reference: createCrossSectionalEngine
Anomaly Detection Engine
The anomaly detection streaming engine
(createAnomalyDetectionEngine
) is used to detect anomalies by
analyzing metric values. It outputs the records that satisfy the anomaly
conditions.
-
IoT: Monitoring device or power status;
-
Financial risk management: Filtering orders, monitoring trading volume, or setting overload alerts.
Reference: createAnomalyDetectionEngine
Join Engine
Standard SQL joins are designed for combining tables with historical data. However, to join streams with a ANSI SQL join statement, first you need to save the streams as snapshots at fixed intervals, process each snapshot independently, then combine the results. Writing such a query is cumbersome and it cannot generate the result with the timeliness required by real-time computing.
Instead, DolphinDB provides the following engines for joining stream tables.
Asof Join Engine
The asof join engine is typically used to work with time series data. In some cases, the left and right tables to be joined have high-precision timestamps that do not match exactly. For each record in the left table, the asof join engine matches it with the most recent record satisfying the matching condition (e.g., with same stock symbol) from the right table.
For example, when processing the US stock market data, you can join the trades and quotes data using the asof join engine to calculate the transaction costs.
Reference: createAsofJoinEngine
Equi Join Engine
The equi join engine combines two streams based on equivalent values in specified columns. For example, use this engine to join the 1-minute market snapshot with trade data captured at one-minute interval to get all essential indicators in one table for further analysis on trading strategy.
Reference: createEquiJoinEngine
Window Join Engine
For each record in the left table, rather than joining it with a single value from the right table, the engine computes on a specified window in the right table.
For example, we usually estimate the transaction cost of individual stock by joining the trades and quotes data. Using the window join engine, we can get a more reasonable estimation of transaction cost by calculating the average or median of quotes over a specified window around each trade, instead of using a single quote.
Reference: createWindowJoinEngine
Left Semi Join Engine
The left semi join engine joins each record in the left table with a matching record in the right table. Unlike the lookup join engine, there will be no return until a match is found. For example, you can use the engine to join each trade with the corresponding order; or join the snapshot of individual stocks with the snapshot of an index for correlation calculation.
The four types of join engines introduced so far are used to join two stream tables. To join a stream table and a table of historical data (in-memory table and dimension table are supported), use the lookup join engine.
Reference: createLeftSemiJoinEngine
Lookup Join Engine
The lookup join engine enriches a stream table with the data queried from another table (usually a dimension table) through left join. For example, you can use the lookup join engine to join the latest snapshot of a stock and intraday trading indicators from the previous day to gain insights into real time trading.
Reference: createLookupJoinEngine
Snapshot Join Engine
The snapshot join engine joins each record from both tables with either the latest matching record or with all matching records from the other table. For example, you can join account holdings with market prices to assess risk metrics such as leverage and net worth.
Reference: createSnapshotJoinEngine
Streaming Pipeline
In some cases, the metrics you provide need to be calculated on different dimensions through pipeline processing of multiple engines. You need to manually distinguish which part is, for example, a cross-sectional operation and which part is a time series operation, and implement the metrics with the appropriate engines. By using an engine as the output of another engine, we can build an engine pipeline.
Note: Starting from version 1.30.17/2.00.5, you can ingest the output of a non-join engine into a join engine through the built-in functions getLeftStream and getRightStream to use the engines in series.