MDL
安装
预编译安装
将文件夹下载解压到一个目录下,在 Linux 中执行以下命令:
export
LD_LIBRARY_PATH="LD_LIBRARY_PATH:/path/to/DolphinDBPlugin/mdl"
在 Linux 上启动 DolphinDB 服务,并在 DolphinDB 客户端运行以下命令加载插件:
loadPlugin("/path/to/DolphinDBPlugin/mdl/PluginMDL.txt")
手动编译安装
直接使用 server 路径下的 libDolphinDB.so 即可,需要复制到插件的构建路径下。
cd build cp xxx/libDolphinDB.so
./libDolphinDB.so cmake .. make -j16
用户接口
createHandle
语法
createHandle(name, host,
port, username, [workerNum=1])
参数
- name: String 型标量,作为句柄的唯一标识,不可重复。
- host: String 型向量,服务器的IP或者域名。
- port: Int 型向量,端口号,个数需要与 host 参数的数量一致。
- username: String 型标量,用户 ID。
- workerNum: Int 型标量,MDL 工作线程的数量,可选参数。
作用
返回一个 MDL 句柄,用于之后的操作。
getSchema
语法
getSchema(svrID,
msgID)
参数
- svrID: String 型标量,数据服务 ID。
- msgID: Int 型标量,消息 ID。
作用
获取对应消息的表结构。
另外可以通过 schema = getSchema("SHL2_SZL2_ORDER_AND_TRANSACTION", 0) 获取用于 subscribeByChannelNo 中 dict 字典中表的结构。
subscribe
语法
subscribe(handle, streamTable, svrID,
svrVersion, msgID, [fieldName], [fieldValues])
参数
- handle: MDL 句柄。
- streamTable: 一个共享流表,需要在订阅前创建,该表的 schema 可以通过插件提供的 getSchema 函数来获取。连接服务器后会将订阅的数据实时刷新到这个流表中。
- svrID: String 型标量,要订阅的数据服务 ID。
- svrVersion: String 型标量,要订阅的数据服务版本号。
- msgID: Int 型标量,要订阅的消息 ID。
- fieldName: String 型标量,要过滤的字段名,可选参数。
- fieldValues: String 型向量,要过滤的字段对应的值,可选参数,对于日期类型,"0" 用于订阅日期为空的情况,"20230808" 这种形式用于订阅指定的日期。
作用
记录要订阅的消息,然后在连接到服务器时提交订阅请求,必须在调用 connect 之前调用。可以使用字段名与字段值对要订阅的服务进行过滤,如可以通过 fieldName 指定股票编码字段,通过 fieldValues 指定要订阅的股票编码。
一个 MDL 句柄订阅的一种消息只能指定一个流表,如果需要对一个消息订阅多次并分别写入到多个流表,可以通过 createHandle 创建多个句柄,分别进行订阅并指定要写入到的流表。
subscribeByChannelNo
语法
subscribeByChannelNo(handle,
dict, svrID)
参数
- handle: MDL 句柄。
- dict: 字典类型,key 是 Int 类型的 channelNo,value 是对应 channelNo 行情要写入的共享流表,该表的结构通过 getSchema 函数来获取(getSchema("SHL2_SZL2_ORDER_AND_TRANSACTION", 0)),目前支持 "SHL2_SZL2_ORDER_AND_TRANSACTION" 一种,对应上交所 L2 和深交所 L2 的 order 和 transaction 四种行情。
- svrID: String 型标量,要订阅的数据服务 ID,目前支持 "SHL2_ORDER_AND_TRANSACTION" 和 "SZL2_ORDER_AND_TRANSACTION" 两种,每一种分别订阅对应交易所的 order 和 transaction 行情。
作用
按照 svrID 订阅上交所 L2 或深交所 L2 的 order 和 transaction 行情,并通过多线程异步写的方式,按照 dict 将实时数据写入对应 channelNo 的目标流表中。
另外因为通联 MDL 不支持同时订阅在不同服务器的数据源,也就是当同时订阅上交所 L2 和深交所 L2 时,只能收到其中一个数据源的行情。为了解决这个问题,需要创建两个句柄,分别订阅上交所 L2 和深交所 L2 的数据,一个句柄只订阅一个数据源的行情。
unsubscribe
语法
unsubscribe(handle, svrID,
svrVersion, msgID, [fieldName], [fieldValues])
参数
- handle: MDL 句柄。
- svrID: String 型标量,要取消订阅的数据服务 ID。
- svrVersion: String 型标量,要取消订阅的数据服务版本号。
- msgID: Int 型标量,要取消订阅的消息 ID。
- fieldName: String 型标量,要过滤的字段名,可选参数。
- fieldValues: String 型向量,要过滤的字段对应的值,可选参数。
作用
取消订阅,必须在调用 connect 之前调用。
unsubscribeChannelNo
语法
unsubscribeChannelNo(handle,
svrID)
参数
- handle: MDL 句柄。
- svrID: String 型标量,要取消订阅的数据服务 ID,目前支持 "SHL2_ORDER_AND_TRANSACTION" 和 "SZL2_ORDER_AND_TRANSACTION" 两种。
作用
取消之前 subscribeByChannelNo 的订阅,必须在调用 connect 之前调用。
connectMDL
语法
connectMDL(handle)
参数
- handle: MDL 句柄。
作用
连接服务器并提交用户的订阅请求。
deleteHandle
语法
deleteHandle(handle)
参数
- handle: MDL 句柄。
作用
手动关闭 MDL 句柄并释放掉资源。注意会话结束会关闭句柄但不会断开连接和释放资源,之后可以通过 getHandle 来重新获取句柄来释放。
getHandle
语法
getHandle(name)
参数
- name: String 型标量,是 createHandle 创建时给该 MDL 句柄赋予的标识名。
作用
获取对应 name 的 MDL 句柄。
getHandleStatus
语法
getHandleStatus()
作用
返回一张表,表中保存着 MDL 实例的各种信息。
-
HandleName 记录着句柄名。
-
Address 记录着连接服务器的地址,格式是 “host1:port1; host2:port2;…”。
-
UserName 记录着用户名;CreateTime 记录着 MDL 实例创建的时间。
-
IsConnect 记录着是否已建立连接。
-
SubscribeInfo 记录着订阅信息,格式是 “[{"fieldname":"xx“,"fieldvalues":["xx","xx"],"mid":xx,"sid":xx},{"mid":xx,"sid":xx}]”, 其中 fieldName 和 fieldValues 是过滤的字段,不一定存在,mid 消息ID,sid 是数据服务ID。
HandleName Address UserName CreatedTime IsConnected SubscribeInfo handle1 mdl.XXX.com:19xxx; mdl.XXX.com:19xxx TOKENXXXXXXXXXXXXX 2012.06.13 13:30:10 true [{"fieldname":"SecurityID","fieldvalues":["510650","603238"],"mid":4,"sid":4},{"mid":6,"sid":4}]
使用示例
示例1
loadPlugin("/path/to/DolphinDBPlugin/mdl/PluginMDL.txt") // 加载插件
schema = MDL::getSchema(`MDLSID_MDL_SHL2, 4) // 获取 MDLSID_MDL_SHL2 即上交所 L2 的 4 号行情的表结构
tb1 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb1, tableName=`Tb1, cacheSize=10000)
schema = MDL::getSchema(`MDLSID_MDL_SHL2, 6)
tb2 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb2, tableName=`Tb2, cacheSize=10000)
host = ["mdl-XXX.datayes.com"]
port = [端口号]
handle = MDL::createHandle(`handle1, host, port, 用户token) // 创建 MDL 句柄
MDL::subscribe(handle, tb1, `MDLSID_MDL_SHL2, `MDLVID_MDL_SHL2, 4, "SecurityID", [`603238, `510650])
MDL::subscribe(handle, tb2, `MDLSID_MDL_SHL2, `MDLVID_MDL_SHL2, 6)
MDL::connectMDL(handle) // 连接服务器,host 和 port 在 createHandle 创建句柄时指定
handle2 = MDL::getHandle(`handle1) // 获取名为 "handle1" 的 MDL 句柄
status = MDL::getHandleStatus() // 获取当前所有句柄的消息,并以表形式返回
MDL::deleteHandle(handle) // 删除句柄
示例2
loadPlugin("/path/to/DolphinDBPlugin/mdl/PluginMDL.txt");
dropStreamTable(`Tb1)
dropStreamTable(`Tb2)
dropStreamTable(`Tb3)
// 创建 4 个相同的目标表
schema = MDL::getSchema("SHL2_SZL2_ORDER_AND_TRANSACTION", 0)
tb1 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb1, tableName=`Tb1, cacheSize=10000)
tb2 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb2, tableName=`Tb2, cacheSize=10000)
tb3 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb3, tableName=`Tb3, cacheSize=10000)
// 创建两个句柄,分别对应上交所 L2 和 深交所 L2
handle1 = MDL::createHandle(`handle1, ["mdlXXX.datayes.com"], [端口号], 用户token) // 上交 L2
handle2 = MDL::createHandle(`handle2, ["mdlXXX.datayes.com"], [端口号], 用户token) // 深交 L2
// 订阅
dict1 = dict([1, 2],[tb1, tb2]) // channelNo -> streamTable
MDL::subscribeByChannelNo(handle1, dict1, `SHL2_ORDER_AND_TRANSACTION) // 按 chennelno 订阅上交 L2 的 order 和 transaction
dict2 = dict([2011, 2012],[tb2, tb3])
MDL::subscribeByChannelNo(handle2, dict2, `SZL2_ORDER_AND_TRANSACTION) // 按 chennelno 订阅深交 L2 的 order 和 transaction
// 连接并开始接受数据
MDL::connectMDL(handle1)
MDL::connectMDL(handle2)
MDL::deleteHandle(handle1)
MDL::deleteHandle(handle2)
MDL 与 DDB 类型对照
描述 | MDL 类型 | DDB 类型 |
---|---|---|
8 位整数 | int8_t | SYMBOL |
16 位整数 | int16_t | SHORT |
32 位整数 | int32_t | INT |
64 位整数 | int64_t | LONG |
8 位无符号整数 | uint8_t | SHORT |
16 位无符号整数 | uint16_t | INT |
32 位无符号整数 | uint32_t | LONG |
64 位无符号整数 | uint64_t | LONG |
单精度浮点 | MDLFloatT | FLOAT |
双精度浮点 | MDLDouble | DOUBLE |
Ascii 字符串 | MDLAnsiString | STRING |
UTF8 字符串 | MDLUTF8String | STRING |
时间 | MDLTime | TIME |
日期 | MDLDate | DATE |
subscribeByChannelNo 使用示例
subscribeByChannelNo 用 channelNo 值作为 key 将上交L2和深交L2的 Order 和 Transaction 行情写入对应 key 的 dolphindb 目标流表。
loadPlugin("/path/to/DolphinDBPlugin/mdl/PluginMDL.txt");
dropStreamTable(`Tb1)
dropStreamTable(`Tb2)
dropStreamTable(`Tb3)
// 创建 3 个相同的目标流表
schema = MDL::getSchema("SHL2_SZL2_ORDER_AND_TRANSACTION", 0)
tb1 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb1, tableName=`Tb1, cacheSize=10000)
tb2 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb2, tableName=`Tb2, cacheSize=10000)
tb3 = streamTable(10000:0, schema[`name], schema[`type])
enableTableShareAndPersistence(table=tb3, tableName=`Tb3, cacheSize=10000)
// 创建两个句柄,分别对应上交所 L2 和 深交所 L2
handle1 = MDL::createHandle(`handle1, ["mdlXXX.datayes.com"], [端口号], 用户token) // 上交 L2
handle2 = MDL::createHandle(`handle2, ["mdlXXX.datayes.com"], [端口号], 用户token) // 深交 L2
// 订阅
dict1 = dict([1, 2],[tb1, tb2]) // channelNo 为 1 写入 tb1,channelNo 为 2 写入 tb2
MDL::subscribeByChannelNo(handle1, dict1, `SHL2_ORDER_AND_TRANSACTION) // 按 chennelno 订阅上交 L2 的 order 和 transaction
dict2 = dict([2011, 2012],[tb2, tb3])
MDL::subscribeByChannelNo(handle2, dict2, `SZL2_ORDER_AND_TRANSACTION) // 按 chennelno 订阅深交 L2 的 order 和 transaction
// 连接并开始接受数据
MDL::connectMDL(handle1)
MDL::connectMDL(handle2)
MDL::deleteHandle(handle1)
MDL::deleteHandle(handle2)