MDL

安装

预编译安装

将文件夹下载解压到一个目录下,在 Linux 中执行以下命令:

export
                    LD_LIBRARY_PATH="LD_LIBRARY_PATH:/path/to/DolphinDBPlugin/mdl"

在 Linux 上启动 DolphinDB 服务,并在 DolphinDB 客户端运行以下命令加载插件:

loadPlugin("/path/to/DolphinDBPlugin/mdl/PluginMDL.txt")

手动编译安装

直接使用 server 路径下的 libDolphinDB.so 即可,需要复制到插件的构建路径下。

cd build cp xxx/libDolphinDB.so
                    ./libDolphinDB.so cmake .. make -j16

用户接口

createHandle

语法

createHandle(name, host, port, username, [workerNum=1])

参数

  • name: String 型标量,作为句柄的唯一标识,不可重复。
  • host: String 型向量,服务器的IP或者域名。
  • port: Int 型向量,端口号,个数需要与 host 参数的数量一致。
  • username: String 型标量,用户 ID。
  • workerNum: Int 型标量,MDL 工作线程的数量,可选参数。

作用

返回一个 MDL 句柄,用于之后的操作。

getSchema

语法

getSchema(svrID, msgID)

参数

  • svrID: String 型标量,数据服务 ID。
  • msgID: Int 型标量,消息 ID。

作用

获取对应消息的表结构。

另外可以通过 schema = getSchema("SHL2_SZL2_ORDER_AND_TRANSACTION", 0) 获取用于 subscribeByChannelNo 中 dict 字典中表的结构。

subscribe

语法

subscribe(handle, streamTable, svrID, svrVersion, msgID, [fieldName], [fieldValues])

参数

  • handle: MDL 句柄。
  • streamTable: 一个共享流表,需要在订阅前创建,该表的 schema 可以通过插件提供的 getSchema 函数来获取。连接服务器后会将订阅的数据实时刷新到这个流表中。
  • svrID: String 型标量,要订阅的数据服务 ID。
  • svrVersion: String 型标量,要订阅的数据服务版本号。
  • msgID: Int 型标量,要订阅的消息 ID。
  • fieldName: String 型标量,要过滤的字段名,可选参数。
  • fieldValues: String 型向量,要过滤的字段对应的值,可选参数,对于日期类型,"0" 用于订阅日期为空的情况,"20230808" 这种形式用于订阅指定的日期。

作用

记录要订阅的消息,然后在连接到服务器时提交订阅请求,必须在调用 connect 之前调用。可以使用字段名与字段值对要订阅的服务进行过滤,如可以通过 fieldName 指定股票编码字段,通过 fieldValues 指定要订阅的股票编码。

一个 MDL 句柄订阅的一种消息只能指定一个流表,如果需要对一个消息订阅多次并分别写入到多个流表,可以通过 createHandle 创建多个句柄,分别进行订阅并指定要写入到的流表。

subscribeByChannelNo

语法

subscribeByChannelNo(handle, dict, svrID)

参数

  • handle: MDL 句柄。
  • dict: 字典类型,key 是 Int 类型的 channelNo,value 是对应 channelNo 行情要写入的共享流表,该表的结构通过 getSchema 函数来获取(getSchema("SHL2_SZL2_ORDER_AND_TRANSACTION", 0)),目前支持 "SHL2_SZL2_ORDER_AND_TRANSACTION" 一种,对应上交所 L2 和深交所 L2 的 order 和 transaction 四种行情。
  • svrID: String 型标量,要订阅的数据服务 ID,目前支持 "SHL2_ORDER_AND_TRANSACTION" 和 "SZL2_ORDER_AND_TRANSACTION" 两种,每一种分别订阅对应交易所的 order 和 transaction 行情。

作用

按照 svrID 订阅上交所 L2 或深交所 L2 的 order 和 transaction 行情,并通过多线程异步写的方式,按照 dict 将实时数据写入对应 channelNo 的目标流表中。

另外因为通联 MDL 不支持同时订阅在不同服务器的数据源,也就是当同时订阅上交所 L2 和深交所 L2 时,只能收到其中一个数据源的行情。为了解决这个问题,需要创建两个句柄,分别订阅上交所 L2 和深交所 L2 的数据,一个句柄只订阅一个数据源的行情。

unsubscribe

语法

unsubscribe(handle, svrID, svrVersion, msgID, [fieldName], [fieldValues])

参数

  • handle: MDL 句柄。
  • svrID: String 型标量,要取消订阅的数据服务 ID。
  • svrVersion: String 型标量,要取消订阅的数据服务版本号。
  • msgID: Int 型标量,要取消订阅的消息 ID。
  • fieldName: String 型标量,要过滤的字段名,可选参数。
  • fieldValues: String 型向量,要过滤的字段对应的值,可选参数。

作用

取消订阅,必须在调用 connect 之前调用。

unsubscribeChannelNo

语法

unsubscribeChannelNo(handle, svrID)

参数

  • handle: MDL 句柄。
  • svrID: String 型标量,要取消订阅的数据服务 ID,目前支持 "SHL2_ORDER_AND_TRANSACTION" 和 "SZL2_ORDER_AND_TRANSACTION" 两种。

作用

取消之前 subscribeByChannelNo 的订阅,必须在调用 connect 之前调用。

connectMDL

语法

connectMDL(handle)

参数

  • handle: MDL 句柄。

作用

连接服务器并提交用户的订阅请求。

deleteHandle

语法

deleteHandle(handle)

参数

  • handle: MDL 句柄。

作用

手动关闭 MDL 句柄并释放掉资源。注意会话结束会关闭句柄但不会断开连接和释放资源,之后可以通过 getHandle 来重新获取句柄来释放。

getHandle

语法

getHandle(name)

参数

  • name: String 型标量,是 createHandle 创建时给该 MDL 句柄赋予的标识名。

作用

获取对应 name 的 MDL 句柄。

getHandleStatus

语法

getHandleStatus()

作用

返回一张表,表中保存着 MDL 实例的各种信息。

  • HandleName 记录着句柄名。

  • Address 记录着连接服务器的地址,格式是 “host1:port1; host2:port2;…”。

  • UserName 记录着用户名;CreateTime 记录着 MDL 实例创建的时间。

  • IsConnect 记录着是否已建立连接。

  • SubscribeInfo 记录着订阅信息,格式是 “[{"fieldname":"xx“,"fieldvalues":["xx","xx"],"mid":xx,"sid":xx},{"mid":xx,"sid":xx}]”, 其中 fieldName 和 fieldValues 是过滤的字段,不一定存在,mid 消息ID,sid 是数据服务ID。

    HandleName Address UserName CreatedTime IsConnected SubscribeInfo
    handle1 mdl.XXX.com:19xxx; mdl.XXX.com:19xxx TOKENXXXXXXXXXXXXX 2012.06.13 13:30:10 true [{"fieldname":"SecurityID","fieldvalues":["510650","603238"],"mid":4,"sid":4},{"mid":6,"sid":4}]

使用示例

示例1

loadPlugin("/path/to/DolphinDBPlugin/mdl/PluginMDL.txt") // 加载插件

schema = MDL::getSchema(`MDLSID_MDL_SHL2, 4) // 获取 MDLSID_MDL_SHL2 即上交所 L2 的 4 号行情的表结构
tb1 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb1, tableName=`Tb1, cacheSize=10000)

schema = MDL::getSchema(`MDLSID_MDL_SHL2, 6)
tb2 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb2, tableName=`Tb2, cacheSize=10000)

host = ["mdl-XXX.datayes.com"]
port = [端口号]
handle = MDL::createHandle(`handle1, host, port, 用户token) // 创建 MDL 句柄

MDL::subscribe(handle, tb1, `MDLSID_MDL_SHL2, `MDLVID_MDL_SHL2, 4, "SecurityID", [`603238, `510650])
MDL::subscribe(handle, tb2, `MDLSID_MDL_SHL2, `MDLVID_MDL_SHL2, 6)

MDL::connectMDL(handle) // 连接服务器,host 和 port 在 createHandle 创建句柄时指定

handle2 = MDL::getHandle(`handle1) // 获取名为 "handle1" 的 MDL 句柄

status = MDL::getHandleStatus() // 获取当前所有句柄的消息,并以表形式返回

MDL::deleteHandle(handle) // 删除句柄

示例2

loadPlugin("/path/to/DolphinDBPlugin/mdl/PluginMDL.txt");

dropStreamTable(`Tb1)
dropStreamTable(`Tb2)
dropStreamTable(`Tb3)

// 创建 4 个相同的目标表
schema = MDL::getSchema("SHL2_SZL2_ORDER_AND_TRANSACTION", 0)
tb1 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb1, tableName=`Tb1, cacheSize=10000)
tb2 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb2, tableName=`Tb2, cacheSize=10000)
tb3 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb3, tableName=`Tb3, cacheSize=10000)

// 创建两个句柄,分别对应上交所 L2 和 深交所 L2
handle1 = MDL::createHandle(`handle1, ["mdlXXX.datayes.com"], [端口号], 用户token) // 上交 L2
handle2 = MDL::createHandle(`handle2, ["mdlXXX.datayes.com"], [端口号], 用户token) // 深交 L2

// 订阅
dict1 = dict([1, 2],[tb1, tb2]) // channelNo -> streamTable
MDL::subscribeByChannelNo(handle1, dict1, `SHL2_ORDER_AND_TRANSACTION) // 按 chennelno 订阅上交 L2 的 order 和 transaction

dict2 = dict([2011, 2012],[tb2, tb3])
MDL::subscribeByChannelNo(handle2, dict2, `SZL2_ORDER_AND_TRANSACTION) // 按 chennelno 订阅深交 L2 的 order 和 transaction

// 连接并开始接受数据
MDL::connectMDL(handle1)
MDL::connectMDL(handle2)

MDL::deleteHandle(handle1)
MDL::deleteHandle(handle2)

MDL 与 DDB 类型对照

描述 MDL 类型 DDB 类型
8 位整数 int8_t SYMBOL
16 位整数 int16_t SHORT
32 位整数 int32_t INT
64 位整数 int64_t LONG
8 位无符号整数 uint8_t SHORT
16 位无符号整数 uint16_t INT
32 位无符号整数 uint32_t LONG
64 位无符号整数 uint64_t LONG
单精度浮点 MDLFloatT FLOAT
双精度浮点 MDLDouble DOUBLE
Ascii 字符串 MDLAnsiString STRING
UTF8 字符串 MDLUTF8String STRING
时间 MDLTime TIME
日期 MDLDate DATE

subscribeByChannelNo 使用示例

subscribeByChannelNo 用 channelNo 值作为 key 将上交L2和深交L2的 Order 和 Transaction 行情写入对应 key 的 dolphindb 目标流表。

loadPlugin("/path/to/DolphinDBPlugin/mdl/PluginMDL.txt");

dropStreamTable(`Tb1)
dropStreamTable(`Tb2)
dropStreamTable(`Tb3)

// 创建 3 个相同的目标流表
schema = MDL::getSchema("SHL2_SZL2_ORDER_AND_TRANSACTION", 0)

tb1 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb1, tableName=`Tb1, cacheSize=10000)
tb2 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb2, tableName=`Tb2, cacheSize=10000)
tb3 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb3, tableName=`Tb3, cacheSize=10000)

// 创建两个句柄,分别对应上交所 L2 和 深交所 L2
handle1 = MDL::createHandle(`handle1, ["mdlXXX.datayes.com"], [端口号], 用户token) // 上交 L2
handle2 = MDL::createHandle(`handle2, ["mdlXXX.datayes.com"], [端口号], 用户token) // 深交 L2

// 订阅
dict1 = dict([1, 2],[tb1, tb2]) // channelNo 为 1 写入 tb1,channelNo 为 2 写入 tb2
MDL::subscribeByChannelNo(handle1, dict1, `SHL2_ORDER_AND_TRANSACTION) // 按 chennelno 订阅上交 L2 的 order 和 transaction

dict2 = dict([2011, 2012],[tb2, tb3])
MDL::subscribeByChannelNo(handle2, dict2, `SZL2_ORDER_AND_TRANSACTION) // 按 chennelno 订阅深交 L2 的 order 和 transaction

// 连接并开始接受数据
MDL::connectMDL(handle1)
MDL::connectMDL(handle2)

MDL::deleteHandle(handle1)
MDL::deleteHandle(handle2)