zmq
ZeroMQ (also known as zmq) is an asynchronous messaging library, aimed at use in distributed or concurrent applications.
Through the DolphinDB zmq plugin, users can create zmq sockets and communicate via zmq messages, which includes: session establishment, message publish-subscribe and transmission.
Installation (with installPlugin
)
Required server version: DolphinDB 2.00.10 or higher.
Supported OS: Linux-x86.
Installation Steps:
(1) Use listRemotePlugins to check plugin information in the plugin repository.
Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.
login("admin", "123456")
listRemotePlugins(, "http://plugins.dolphindb.com/plugins/")
(2) Invoke installPlugin for plugin installation
installPlugin("zmq")
(3) Use loadPlugin to load the plugin before using the plugin methods.
loadPlugin("zmq")
Send
socket
Syntax
socket(type, formatter, [batchSize], [prefix])
Details
The method creates a zmq socket.
Note: When using methods connect
, bind
, send
and close
for concurrent operations, different zmq sockets must be constructed for different threads.
Parameters
- type: A STRING scalar indicating the socket type to be created. It can be “ZMQ_PUB” and “ZMQ_PUSH”.
- formatter: A function used to package published data in a specified format. Currently it supports methods
createJsonFormatter
andcreateCsvFormatter
. Alternatively, you can define a formatter function which takes data of methodsend
as the argument. - batchSize (optional): An integer indicating the number of messages sent each time. For a table to be published, it can be sent in batches.
- prefix (optional): A STRING scalar indicating the message prefix.
Examples
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
connect
Syntax
connect(socket, addr, [prefix])
Details
The method uses socket to establish connections to zmq. Keepalive is enabled after the tcp connection is set so that it can be automatically connected.
Parameters
- socket: A zmq socket.
- addr: A STRING scalar indicating the remote address in the form of "transport://interface:port". "transport" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "address:port" is the remote IP address and port number.
- prefix (optional): A STRING scalar indicating the message prefix.
Examples
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")
bind
Syntax
bind(socket, addr, [prefix])
Details
The method binds a socket to a specific address to accept incoming requests.
Parameters
- socket: A zmq socket.
- addr: A STRING scalar indicating the remote address in the form of "transport://interface:port". "transport" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "address:port" is the remote IP address and port number.
- prefix (optional): A STRING scalar indicating the message prefix.
Examples
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::bind(socket, "tcp://*:55631", "prefix1")
send
Syntax
send(socket, data, [prefix])
Details
The method sends a zmq message and returns true if successful.
Parameters
- socket: A zmq socket.
- data: The data to be sent. Its data type must match the argument passing to *formatter *of method zmq::socket. Otherwise, the formatting will fail and an exception will be thrown.
- prefix (optional): A STRING scalar indicating the message prefix.
Examples
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")
zmq::send(socket, table(1..10 as id))
close
Syntax
close(socket)
Details
The method closes a zmq socket.
Parameters
- socket: A zmq socket.
Examples
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")
zmq::close(socket)
Subscribe
createSubJob
Syntax
createSubJob(addr, type, isConnnect, handle, parser, [prefix])
Details
The method creates a zmq subscription. The subscription will automatically reconnect after network failures.
Parameters
- addr: A STRING scalar indicating the remote address in the form of "transport://interface:port". "transport" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "address:port" is the remote IP address and port number.
- type: A STRING scalar indicating the socket type to be created. It can be “ZMQ_SUB” and “ZMQ_PULL”.
- isConnnect: A Bool scalar indicating whether to connect to addr. If false the addr is binded.
- handle: A function or a table used to handle messages sent from zmq.
- parser: A function for parsing subscribed messages. Currently supported functions are
createJsonParser
andcreateCsvParser
. It takes a STRING scalar as input and outputs a table. - prefix (optional): A STRING indicating the message prefix.
Examples
handle = streamTable(10:0, [`int], [INT])
enableTableShareAndPersistence(table=handle, tableName=`test1, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=120)
parser = zmq::createJSONParser([INT], [`bool])
zmq::createSubJob("tcp://localhost:55633", "ZMQ_SUB", true, handle, parser, "prefix1")
You can use it with a Python script:
import zmq
import time
import sys
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:55633")
msg = '[{"bool":234}]'
while True:
socket.send(msg.encode('utf-8'))
time.sleep(2)
getSubJobStat
Syntax
getSubJobStat()
Details
The method gets all zmq subscription messages and returns a table with the following columns:
- subscriptionId: the subscription ID.
- addr: the subscription address.
- prefix: the message prefix.
- recvPackets: the number of messages received.
- createTimestamp: the timestamp when the subscription is created
Examples
handle = streamTable(10:0, [`int], [INT])
enableTableShareAndPersistence(table=handle, tableName=`test1, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=120)
parser = zmq::createJSONParser([INT], [`bool])
zmq::createSubJob("tcp://localhost:55633", "ZMQ_SUB", handle, parser, "prefix1")
zmq::getSubJobStat()
cancelSubJob
Syntax
cancelSubJob(subscription)
Details
The method cancels a zmq subscription.
Parameters
- subscription: The value returned by
createSubJob
, or the subscriptionId returned bygetJobStat
.
Examples
zmq::cancelSubJob(sub1)
zmq::cancelSubJob(42070480)
createPusher
Syntax
createPusher(socket, dummyTable)
Details
The method creates a zmq pusher. The plugin offers 2 ways to send messages to the pusher to forward the messages:
- Append data to the pusher with method
append!
; - Ingest data from the output table of a streaming engine to the pusher.
Parameters
- socket: A zmq socket.
- dummyTable: A table object which receives the input messages.
Examples
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632")
pusher = zmq::createPusher(socket, output1)
engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=pusher, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);
insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23)
setMonitor
Syntax
setMonitor(enabled)
Details
The method determines whether to enable monitoring logs. When enabled, the following zmq events will be written to the INFO level log: new connections, connection reestablishments, port bindings, and connection disconnections.
Parameters
- enabled: A boolean value indicating whether to enable ZeroMQ monitoring logs. If set to true, monitoring logs will be activated.
Formatter/Parser
createCSVFormatter
Syntax
createCSVFormatter([format], [delimiter=','], [rowDelimiter=';'])
Details
The method creates a Formatter function in CSV format.
Parameters
- format: A vector of STRING type.
- delimiter: The delimiter between columns, the default is ','.
- rowDelimiter: The delimiter between rows, the default is ';'.
Examples
MyFormat = take("", 5)
MyFormat[2] = "0.000"
f = createCSVFormatter(MyFormat, ',', ';')
createCSVParser
Syntax
createCSVParser(schema, [delimiter=','], [rowDelimiter=';'])
Details
The method creates a Parser function in CSV format.
Parameters
- schema: A vector indicating the data type of each column.
- delimiter (optional): The delimiter between columns, the default is ','.
- rowDelimiter (optional): The delimiter between rows, the default is ';'.
Examples
def createT(n) {
return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createCSVFormatter([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
s=f(t)
p = zmq::createCSVParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
p(s)
createJSONFormatter
Syntax
createJSONFormatter()
Details
The method creates a Formatter function in JSON format.
Examples
def createT(n) {
return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createJSONFormatter()
f(t)
createJSONParser
Syntax
createJSONParser(schema, colNames)
Details
The method creates a Parser function in JSON format.
Parameters
- schema: A vector indicating the data type of each column.
- colNames: A vector indicating the name of each column.
Examples
def createT(n) {
return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createJSONFormatter()
p = createJSONParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL],
`bool`char`short`int`long`date`month`time`minute`second`datetime`timestamp`nanotime`nanotimestamp`float`double`string`symbol)
s=f(t)
x=p(s)
Usage Examples
loadPlugin("/home/zmx/worker/DolphinDBPlugin/zmq/cmake-build-debug/PluginZmq.txt")
go
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::bind(socket, "tcp://localhost:55632")
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
zmq::send(socket, data)
You can use it with a Python script:
import zmq
from zmq.sugar import socket
import json
if __name__=='__main__':
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.TCP_KEEPALIVE, 1);
socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 30);
socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 1);
socket.setsockopt(zmq.TCP_KEEPALIVE_CNT, 5);
socket.connect("tcp://192.168.0.48:55632")
zip_filter = ""
socket.setsockopt(zmq.SUBSCRIBE, zip_filter.encode('ascii'))
while True:
recvStr = socket.recv()
print (recvStr)