zmq

ZeroMQ (also known as zmq) is an asynchronous messaging library, aimed at use in distributed or concurrent applications.

Through the DolphinDB zmq plugin, users can create zmq sockets and communicate via zmq messages, which includes: session establishment, message publish-subscribe and transmission.

Installation (with installPlugin)

Required server version: DolphinDB 2.00.10 or higher.

Supported OS: Linux-x86.

Installation Steps:

(1) Use listRemotePlugins to check plugin information in the plugin repository.

Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.

login("admin", "123456")
listRemotePlugins(, "http://plugins.dolphindb.com/plugins/")

(2) Invoke installPlugin for plugin installation

installPlugin("zmq")

(3) Use loadPlugin to load the plugin before using the plugin methods.

loadPlugin("zmq")

Send

socket

Syntax

socket(type, formatter, [batchSize], [prefix])

Details

The method creates a zmq socket.

Note: When using methods connect, bind, send and close for concurrent operations, different zmq sockets must be constructed for different threads.

Parameters

  • type: A STRING scalar indicating the socket type to be created. It can be “ZMQ_PUB” and “ZMQ_PUSH”.
  • formatter: A function used to package published data in a specified format. Currently it supports methods createJsonFormatter and createCsvFormatter. Alternatively, you can define a formatter function which takes data of method send as the argument.
  • batchSize (optional): An integer indicating the number of messages sent each time. For a table to be published, it can be sent in batches.
  • prefix (optional): A STRING scalar indicating the message prefix.

Examples

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)

connect

Syntax

connect(socket, addr, [prefix])

Details

The method uses socket to establish connections to zmq. Keepalive is enabled after the tcp connection is set so that it can be automatically connected.

Parameters

  • socket: A zmq socket.
  • addr: A STRING scalar indicating the remote address in the form of "transport://interface:port". "transport" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "address:port" is the remote IP address and port number.
  • prefix (optional): A STRING scalar indicating the message prefix.

Examples

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")

bind

Syntax

bind(socket, addr, [prefix])

Details

The method binds a socket to a specific address to accept incoming requests.

Parameters

  • socket: A zmq socket.
  • addr: A STRING scalar indicating the remote address in the form of "transport://interface:port". "transport" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "address:port" is the remote IP address and port number.
  • prefix (optional): A STRING scalar indicating the message prefix.

Examples

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::bind(socket, "tcp://*:55631", "prefix1")

send

Syntax

send(socket, data, [prefix])

Details

The method sends a zmq message and returns true if successful.

Parameters

  • socket: A zmq socket.
  • data: The data to be sent. Its data type must match the argument passing to *formatter *of method zmq::socket. Otherwise, the formatting will fail and an exception will be thrown.
  • prefix (optional): A STRING scalar indicating the message prefix.

Examples

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")
zmq::send(socket, table(1..10 as id))

close

Syntax

close(socket)

Details

The method closes a zmq socket.

Parameters

  • socket: A zmq socket.

Examples

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")
zmq::close(socket)

Subscribe

createSubJob

Syntax

createSubJob(addr, type, isConnnect, handle, parser, [prefix])

Details

The method creates a zmq subscription. The subscription will automatically reconnect after network failures.

Parameters

  • addr: A STRING scalar indicating the remote address in the form of "transport://interface:port". "transport" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "address:port" is the remote IP address and port number.
  • type: A STRING scalar indicating the socket type to be created. It can be “ZMQ_SUB” and “ZMQ_PULL”.
  • isConnnect: A Bool scalar indicating whether to connect to addr. If false the addr is binded.
  • handle: A function or a table used to handle messages sent from zmq.
  • parser: A function for parsing subscribed messages. Currently supported functions are createJsonParser and createCsvParser. It takes a STRING scalar as input and outputs a table.
  • prefix (optional): A STRING indicating the message prefix.

Examples

handle = streamTable(10:0, [`int], [INT])
enableTableShareAndPersistence(table=handle, tableName=`test1, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=120)
parser = zmq::createJSONParser([INT], [`bool])
zmq::createSubJob("tcp://localhost:55633", "ZMQ_SUB", true, handle, parser, "prefix1")

You can use it with a Python script:

import zmq
import time
import sys

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:55633")
msg = '[{"bool":234}]'

while True:
	socket.send(msg.encode('utf-8'))
	time.sleep(2)

getSubJobStat

Syntax

getSubJobStat()

Details

The method gets all zmq subscription messages and returns a table with the following columns:

  • subscriptionId: the subscription ID.
  • addr: the subscription address.
  • prefix: the message prefix.
  • recvPackets: the number of messages received.
  • createTimestamp: the timestamp when the subscription is created

Examples

handle = streamTable(10:0, [`int], [INT])
enableTableShareAndPersistence(table=handle, tableName=`test1, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=120)
parser = zmq::createJSONParser([INT], [`bool])
zmq::createSubJob("tcp://localhost:55633", "ZMQ_SUB", handle, parser, "prefix1")
zmq::getSubJobStat()

cancelSubJob

Syntax

cancelSubJob(subscription)

Details

The method cancels a zmq subscription.

Parameters

  • subscription: The value returned by createSubJob, or the subscriptionId returned by getJobStat.

Examples

zmq::cancelSubJob(sub1)
zmq::cancelSubJob(42070480)

createPusher

Syntax

createPusher(socket, dummyTable)

Details

The method creates a zmq pusher. The plugin offers 2 ways to send messages to the pusher to forward the messages:

  • Append data to the pusher with method append!;
  • Ingest data from the output table of a streaming engine to the pusher.

Parameters

  • socket: A zmq socket.
  • dummyTable: A table object which receives the input messages.

Examples

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632")
pusher = zmq::createPusher(socket, output1)

engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=pusher, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23)

setMonitor

Syntax

setMonitor(enabled)

Details

The method determines whether to enable monitoring logs. When enabled, the following zmq events will be written to the INFO level log: new connections, connection reestablishments, port bindings, and connection disconnections.

Parameters

  • enabled: A boolean value indicating whether to enable ZeroMQ monitoring logs. If set to true, monitoring logs will be activated.

Formatter/Parser

createCSVFormatter

Syntax

createCSVFormatter([format], [delimiter=','], [rowDelimiter=';'])

Details

The method creates a Formatter function in CSV format.

Parameters

  • format: A vector of STRING type.
  • delimiter: The delimiter between columns, the default is ','.
  • rowDelimiter: The delimiter between rows, the default is ';'.

Examples

MyFormat = take("", 5)
MyFormat[2] = "0.000"
f = createCSVFormatter(MyFormat, ',', ';')

createCSVParser

Syntax

createCSVParser(schema, [delimiter=','], [rowDelimiter=';'])

Details

The method creates a Parser function in CSV format.

Parameters

  • schema: A vector indicating the data type of each column.
  • delimiter (optional): The delimiter between columns, the default is ','.
  • rowDelimiter (optional): The delimiter between rows, the default is ';'.

Examples

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createCSVFormatter([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
s=f(t)
p = zmq::createCSVParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
p(s)

createJSONFormatter

Syntax

createJSONFormatter()

Details

The method creates a Formatter function in JSON format.

Examples

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createJSONFormatter()
f(t)

createJSONParser

Syntax

createJSONParser(schema, colNames)

Details

The method creates a Parser function in JSON format.

Parameters

  • schema: A vector indicating the data type of each column.
  • colNames: A vector indicating the name of each column.

Examples

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createJSONFormatter()
p = createJSONParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL],
`bool`char`short`int`long`date`month`time`minute`second`datetime`timestamp`nanotime`nanotimestamp`float`double`string`symbol)
s=f(t)
x=p(s)

Usage Examples

loadPlugin("/home/zmx/worker/DolphinDBPlugin/zmq/cmake-build-debug/PluginZmq.txt")
go
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::bind(socket, "tcp://localhost:55632")
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
zmq::send(socket, data)

You can use it with a Python script:

import zmq
from zmq.sugar import socket
import json
if __name__=='__main__':
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    
    socket.setsockopt(zmq.TCP_KEEPALIVE, 1);
    socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 30);
    socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 1);
    socket.setsockopt(zmq.TCP_KEEPALIVE_CNT, 5);
    
    socket.connect("tcp://192.168.0.48:55632")
    zip_filter = ""
    socket.setsockopt(zmq.SUBSCRIBE, zip_filter.encode('ascii'))

    while True:
        recvStr = socket.recv()
        print (recvStr)