subscribeTable

Syntax

subscribeTable([server],tableName,[actionName],[offset=-1],handler,[msgAsTable=false],[batchSize=0],[throttle=1],[hash=-1],[reconnect=false],[filter],[persistOffset=false],[timeTrigger=false],[handlerNeedMsgId=false],[raftGroup],[userId=""],[password=""],[udpMulticast=false])

Details

Subscribe to a stream table on a local or remote server from a client node. We can also specify a function to process the subscribed data.

Return the subscription topic, which is a combination of the alias of the node where the stream table is located, stream table name, and the subscription task name (if actionName is specified) separated by "_". If the subscription topic already exists, the function throws an exception.

  • If batchSize is specified, handler will be triggered if either the number of unprocessed messages reaches batchSize or the duration of time since the last time handler was triggered reaches throttle seconds.

  • If the subscribed table is overwritten, to keep the subscription we need to cancel the subscription with command unsubscribeTable and then subscribe to the new table.

  • With the high availability subscription enabled, a leader switch or cluster restart in the raft group of subscribers may log off a user. Since guests have no privilege to write to a DFS table, the writing will be interrupted (the current user can be identified by function getCurrentSessionAndUser). If parameters userId and password are specified, the system will attempt to log in after the user is logged out accidentally to make sure the subscribed data can be written to a DFS table. Note that since DolphinDB 2.00.10.10, users can determine whether to limit the number of failed login attempts by setting the parameter enhancedSecurityVerification. If it is not specified, no limit will be applied; if it is set to true, a user's account will be locked for 10 minutes if the user enters the wrong password 5 times in a minute.

To use UDP multicast in subscription, the socket buffer size must be set as appropriate. The recommended value is greater than or equal to 1 MB.

Here is how to set the socket buffer size in Linux:

  • In the Linux terminal, run the following commands:

    sudo sysctl -w net.core.rmem_default=1048576
    sudo sysctl -w net.core.rmem_max=1048576
    sudo sysctl -w net.core.wmem_default=1048576
    sudo sysctl -w net.core.wmem_max=1048576
  • Alternatively, add or modify the values of net.core.rmem_default, net.core.rmem_max, net.core.wmem_default and net.core.wmem_max to 1048576 in the /etc/sysctl.conf file, and then run sudo sysctl -p.

Note: When switching between TCP and UDP, you need to use unsubscribeTable to cancel existing subscriptions before using subscribeTable to create new subscription topics.

Arguments

Only tableName and handler are required. All the other parameters are optional.

server is a string indicating the alias or the remote connection handle of a server where the stream table is located. If it is unspecified or an empty string (""), it means the local instance.

tableName is a string indicating the name of the shared stream table on the aforementioned server.

actionName is a string indicating subscription task name. It starts with a letter and can have letters, digits, and underscores. It must be specified if multiple subscriptions on the same node subscribe to the same stream table.

offset is an integer indicating the position of the first message where the subscription begins. A message is a row of the stream table. Offset is relative to the first row of the stream table when it is created. If offset is unspecified or -1, the subscription starts with the next new message. If offset=-2, the system will get the persisted offset on disk and start subscription from there. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.

handler is a unary function, binary function or a table, which is used to process the subscribed data.
  • If handler is a unary function, the only parameter of the function is the subscribed data, which can be a table or a tuple of the subscribed table columns.

  • The handler must be specified as a binary function when handlerNeedMsgId = true. The parameters of the function are msgBody and msgId. For more details, see parameter handlerNeedMsgId.

  • If handler is a table, and the subscribed data is inserted into the table directly. It supports the streaming engine, shared table (including stream table, in-memory table, keyed table, indexed table), and DFS table.

Note that handler should be function appendForJoin, getLeftStream or getRightStream while subscribing to a stream table for the streaming join engine.

msgAsTable is a Boolean value indicating whether the subscribed data is ingested into handler as a table or as a tuple. If msgAsTable=true, the subscribed data is ingested into handler as a table and can be processed with SQL statements. The default value is false, which means the subscribed data is ingested into handler as a tuple of columns.

batchSize is an integer indicating the number of unprocessed messages to trigger the handler. If it is positive, the handler does not process messages until the number of unprocessed messages reaches batchSize. If it is unspecified or non-positive, the handler processes incoming messages as soon as they come in.

throttle is a floating point number in seconds, indicating the maximum waiting time before the handler processes the incoming messages if the batchSize condition has not been reached. The default value is 1. This optional parameter has no effect if batchSize is not specified. To set throttle less than 1 second, you need to modify the configuration parameter subThrottle first.

hash is a hash value (a non-negative integer) indicating which subscription executor will process the incoming messages for this subscription. If it is unspecified, the system automatically assigns an executor. When we need to keep the messages synchronized from multiple subscriptions, we can set hash of all these subscriptions to be the same, so that the same executor is used to synchronize multiple data sources.

reconnect is a Boolean value indicating whether the subscription may be automatically resumed successfully if it is interrupted. With a successfully resubscription, the subscriber receives all streaming data since the interruption. The default value is false. If reconnect=true, depending on how the subscription is interrupted, we have the following 3 scenarios:
  • If the network is disconnected while both the publisher and the subscriber node remain on, the subscription will be automatically reconnected if the network connection is resumed.
  • If the publisher node crashes, the subscriber node will keep attempting to resubscribe after the publisher node restarts.
    • If the publisher node adopts data persistence mode for the stream table, the publisher will first load persisted data into memory after restarting. The subscriber won't be able to successfully resubscribe until the publisher reaches the row of data where the subscription was interrupted.
    • If the publisher node does not adopt data persistence mode for the stream table, the automatic resubscription will fail.
  • If the subscriber node crashes, even after the subscriber node restarts, it won't automatically resubscribe. For this case we need to execute `subscribeTable` again.
Note:

To subscribe to a high-availability stream table, the parameter reconnect should be set to true to ensure that the new leader node can be successfully connected to in case of a leader node change.

filter specifies the filter condition(s), which can be used in the following ways:

  • Used with function setStreamTableFilterColumn to specify the values to be filtered for the stream table. Only messages with matching values are subscribed. filter does not support Boolean types and can take the following forms:
    • A vector: implements value filtering. filter messages by element values
    • A pair: implements range filtering. The range includes the lower bound but excludes the upper bound.
    • A tuple: implements hash filtering. The first element is the number of buckets. The second element is a scalar indicating the bucket index (starting from 0) or a pair indicating the bucket index range (including the lower bound but excluding the upper bound).
  • filter can also be specified as a FUNCTIONDEF or STRING scalar indicating a user-defined function. The input string can be a function name or a Lambda expression. The subscribed data is passed into the function as a table and the function result is sent to the subscriber.
persistOffset is a Boolean value indicating whether to persist the offset of the last processed message in the current subscription. It is used for resubscription and can be obtained with function getTopicProcessedOffset. The default value is false.
  • To subscribe to a high-availability stream table, the parameter persistOffset should be set to true to prevent data loss on the subscriber.

  • To resubscribe from the persisted offset, set persistOffset to true and removeOffset of function unsubscribeTable to false.

timeTrigger is a Boolean value. If it is set to true, handler is triggered at the intervals specified by parameter throttle even if no new messages arrive. The default value is false.

handlerNeedMsgId is a Boolean value. The default value is false.

  • If it is true, the parameter handler must support 2 parameters: msgBody (the messages to be ingested into the streaming engine) and msgId (the ID of the last message that has been ingested into the streaming engine). You can pass the appendMsg function to the handler after fixing the engine parameter using partial application.

  • If it is false, handler must support just 1 parameter: msgBody.

raftGroup is the ID of raft group used to enable high availability on the subscriber. If the leader node of a raft group is changed after setting the parameter raftGroup, the new leader will resubscribe to the stream table.
Note:

The function subscribeTable can only be executed on a leader node if the parameter raftGroup is set. If handlerNeedMsgId is also set to true while raftGroup = true, then the handler must be the handler of a stream engine (you can obtain the handler when you create the engine or obtain it with the function "getStreamEngine(engineName)".

userId is a string indicating a user name.

password is a string indicating the password.

udpMulticast (optional, Linux only) is a Boolean value indicating whether to enable UDP multicast. The default value is false. If set to true, the publisher will publish messages to a multicast channel.

Performance differences between TCP and UDP Multicast:

Protocol Pros Cons Application
TCP
  • Reliable transmission
  • Data sequentiality
  • Detection and correction during transmission
  • One-to-one connection, requiring significant server resources and bandwidth with multiple subscribers
Suitable for applications requiring reliable, ordered data transmission to a single recipient
UDP Multicast
  • One-to-multiple real-time transmission
  • Lower resources and network bandwidth requirements
  • High rate of packet loss
  • Unsorted data
Ideal for applications needing rapid real-time data transfer to multiple subscribers

Note:

  • udpMulticast can only be set when reconnect, persisitOffset, timeTrigger is false and raftGroup = -1.
  • The network routers or switches where the publishers and subscribers are located must support the UDP Multicast stack.

Examples

Example 1. A cluster has 2 nodes: DFS_NODE1 and DFS_NODE2. We need to specify maxPubConnections and subPort in cluster.cfg to enable the publish/subscribe functionality. For example:

maxPubConnections=32
DFS_NODE1.subPort=9010
DFS_NODE1.persistenceDir=C:/DolphinDB/Data
DFS_NODE2.subPort=9011

Execute the following script on DFS_NODE1 for the following tasks:

(1) Create a shared stream table "trades_stream" with persistence in synchronous mode. At this stage table "trades_stream" has 0 rows.

n=20000000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
enableTableShareAndPersistence(table=streamTable(n:0, colNames, colTypes), tableName="trades_stream", asynWrite=false, cacheSize=n)
go

(2) Create a DFS table "trades". At this stage table "trades" has 0 rows.

if(existsDatabase("dfs://STREAM_TEST")){
     dropDatabase("dfs://STREAM_TEST")
}
dbDate = database("", VALUE, temporalAdd(date(today()),0..30,'d'))
dbSym= database("", RANGE, string('A'..'Z') join "ZZZZ")
db = database("dfs://STREAM_TEST", COMPO, [dbDate, dbSym])
colNames = `date`time`sym`qty`price
colTypes = [DATE,TIME,SYMBOL,INT,DOUBLE]
trades = db.createPartitionedTable(table(1:0, colNames, colTypes), "trades", `date`sym)

(3) Create a local subscription to table "trades_stream". Use a function saveTradesToDFS to save streaming data from "trades_stream" and today's date to table "trades".

def saveTradesToDFS(mutable dfsTrades, msg): dfsTrades.append!(select today() as date,* from msg)
subscribeTable(tableName="trades_stream", actionName="trades", offset=0, handler=saveTradesToDFS{trades}, msgAsTable=true, batchSize=100000, throttle=60)

(4) Create another local subscription to table "trades_stream". Calculate volume-weighted average price (vwap) with streaming data for each minute and save the result to a shared stream table "vwap_stream" with persistence in asynchronous mode.

n=1000000
tmpTrades = table(n:0, colNames, colTypes)
lastMinute = [00:00:00.000]
colNames = `time`sym`vwap
colTypes = [MINUTE,SYMBOL,DOUBLE]
enableTableShareAndPersistence(table=streamTable(n:0, colNames, colTypes), tableName="vwap_stream")
go

def calcVwap(mutable vwap, mutable tmpTrades, mutable lastMinute, msg){
    tmpTrades.append!(msg)
    curMinute = time(msg.time.last().minute()*60000l)
    t = select wavg(price, qty) as vwap from tmpTrades where time < curMinute, time >= lastMinute[0] group by time.minute(), sym
    if(t.size() == 0) return
    vwap.append!(t)
    t = select * from tmpTrades where time >= curMinute
    tmpTrades.clear!()
    lastMinute[0] = curMinute
    if(t.size() > 0) tmpTrades.append!(t)
}
subscribeTable(tableName="trades_stream", actionName="vwap", offset=0, handler=calcVwap{vwap_stream, tmpTrades, lastMinute}, msgAsTable=true, batchSize=100000, throttle=60)

Execute the following script on DFS_NODE2 to create a remote subscription to table "trades_stream". This subscription saves streaming data to a shared stream table "trades_stream_slave" with persistence in asynchronous mode.

n=20000000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
enableTableShareAndPersistence(table=streamTable(n:0, colNames, colTypes), tableName="trades_stream_slave", cacheSize=n)
go
subscribeTable(server="DFS_NODE1", tableName="trades_stream", actionName="slave", offset=0, handler=trades_stream_slave)

Execute the following script on DFS_NODE1 to simulate the streaming data for 3 stocks and 10 minutes. Generate 2,000,000 records for each stock in each minute. Data in one minute are inserted into the stream table "trades_stream" in 600 blocks. There is a 100 millisecond interval between 2 blocks.

n=10
ticks = 2000000
rows = ticks*3
startMinute = 09:30:00.000
blocks=600
for(x in 0:n){
    time = startMinute + x*60000 + rand(60000, rows)
    indices = isort(time)
    time = time[indices]
    sym = array(SYMBOL,0,rows).append!(take(`IBM,ticks)).append!(take(`MSFT,ticks)).append!(take(`GOOG,ticks))[indices]
    price = array(DOUBLE,0,rows).append!(norm(153,1,ticks)).append!(norm(91,1,ticks)).append!(norm(1106,20,ticks))[indices]
    indices = NULL
    blockSize = rows / blocks
    for(y in 0:blocks){
        range =pair(y * blockSize, (y+1)* blockSize)
        insert into trades_stream values(subarray(time,range), subarray(sym,range), 10+ rand(100, blockSize), subarray(price,range))
        sleep(100)
    }

    blockSize = rows % blocks
    if(blockSize > 0){
        range =pair(rows - blockSize, rows)
        insert into trades_stream values(subarray(time,range), subarray(sym,range), 10+ rand(100, blockSize), subarray(price,range))
    }
}

To check the results, run the following script on DFS_NODE1:

trades=loadTable("dfs://STREAM_TEST", `trades)
select count(*) from trades

We expect to see a result of 60,000,000

select * from vwap_stream

We expect the table "vwap_stream" has 27 rows.

Run the following script on DFS_NODE2:

select count(*) from trades_stream_slave

We expect to see a result of less than 60,000,000 as part of the table has been persisted to disk.

Example 3. Specify filter as a STRING scalar indicating the user-defined function filterFn to filter the subscribed data.

st = streamTable(100:0, `sym`val, `SYMBOL`DOUBLE)
enableTableShareAndPersistence(st, `st1, cacheSize=1000)

outTable = table(100:0, `sym`val, `SYMBOL`DOUBLE)

filterFn = def (msg) {
	return (select * from msg where val > 50.0)
}

subscribeTable(tableName=`st1, actionName=`testFilter, handler=tableInsert{outTable}, filter="filterFn", offset=0)

for (i in 1..100) {
	n = 100
	t = table(rand(`A`B`C, n) as sym, rand(100.0, n) as val)
	st.append!(t)
}
select * from outTable

Specify filter as a FUNCTIONDEF scalar:

filterFn = "def (msg) { return (select * from msg where val > 50.0) }"
subscribeTable(tableName=`st1, actionName=`testFilter, handler=tableInsert{outTable}, filter=filterFn, offset=0)

Example 3. Create stream table publisher on dnode1 of the cluster.

share streamTable(10:0,`time`id`value,[TIMESTAMP,INT,DOUBLE]) as publisher

Subscribe to publisher using UDP multicast on dnode2 and write data to sub1:

sub1 = streamTable(10:0,`time`id`value,[TIMESTAMP,INT,DOUBLE])
subscribeTable(server="dnode1",tableName="publisher",actionName="sub1",offset=-1,handler=append!{sub1},msgAsTable=true, udpMulticast=true)

Write data to dnode1:

publisher.tableInsert(table(now()+1..10*1000 as time,1..10 as id, rand(100,10) as value))

Check the publishing status:

getStreamingStat().udpPubTables
tableName channel msgOffset actions subNum
publisher 224.1.1.1:1235 10 sub1 1

Check subscription workers on dnode2:

getStreamingStat().subWorkers
workerId topic type queueDepthLimit queueDepth processedMsgCount lastMsgId failedMsgCount lastFailedMsgId lastFailedTimestamp lastErrMsg msgAsTable batchSize throttle hash filter persistOffset timeTrigger handlerNeedMsgId raftGroup
0 2 localhost:8702: dnode1/publisher/sub1 udp 10,000,000 0 10 9 0 -1 true 0 0 1 false false false