NSQ

为对接恒生 NSQ 极速行情服务软件,DolphinDB 基于恒生 HSNsqApi 开发了 NSQ 插件。通过该插件能够获取上海和深圳市场的行情。主要获得以下三种行情:

  1. 主推-现货深度行情主推回调(OnRtnSecuDepthMarketData->snapshot)
  2. 主推-现货逐笔成交行情主推回调(OnRtnSecuTransactionTradeData->trade)
  3. 主推-现货逐笔委托行情主推回调(OnRtnSecuTransactionEntrustData->orders)

请注意,DolphinDB NSQ 插件仅提供行情对接服务。数据源和接入服务可咨询数据服务商或证券公司。

1. 安装插件

版本要求

  • DolphinDB Server:2.00.10 及更高版本。
  • OS:x86-64 Linux, Windows。

安装步骤

  1. 在 DolphinDB 客户端中使用 listRemotePlugins 命令查看插件仓库中的插件信息。

    注意:仅展示当前操作系统和 server 版本支持的插件。若无预期插件,可自行编译(请选择对应分支下的插件)或在 DolphinDB 用户社区进行反馈。

    login("admin", "123456");
    listRemotePlugins()
  2. 使用 installPlugin 命令完成插件安装。

    installPlugin("nsq");
  3. 使用 loadPlugin 命令加载插件。

    loadPlugin("nsq");

2. 接口函数

connect

语法

nsq::connect(configFilePath, [options], [username], [password])

详情

该函数将根据 NSQ 配置文件 sdk_config.ini 、传入的用户名和密码,与行情服务器建立连接。连接成功后,将在日志文件 dolphindb.log 中会打印 “OnFrontConnected”。

注意:

  1. 部分 NSQ 的行情服务器会强制要求用户传入用户名密码,请注意填写。
  2. 再次执行 connect 进行重新连接前,需要先执行 nsq::close() 断开连接,否则会抛出异常。

参数

configFilePath 字符串标量,表示 sdk_config.ini 的绝对路径;若拷贝 sdk_config.ini 至 dolphindb server,则可以是相对于 DolphinDB Server 的一个相对路径。

options 字典类型,可选参数,表示扩展参数。当前键只支持 receivedTime 和 getAllFieldNames。receivedTime 表示是否显示接收时间,对应值为布尔值。 getAllFieldNames 表示是否接受所有字段数据,对应值为布尔值。详见后文示例。

username 字符串类型,可选参数,表示登录的用户名。

password 字符串类型,可选参数,表示登录用户名对应的密码。

getSchema

语法

nsq::getSchema(dataType)

详情

该函数需要在 connect 函数之后调用。后续根据 getSchema 返回的表结构创建流表。

返回一个表,包含两列:name,type,分别表示该行情表各个字段的名称、类型名。可通过该表来创建具有相同结构的共享流表。

参数

dataType 一个字符串,表示所要获取的表结构的类型,包含 snapshot, trade, orders。

subscribe

语法

nsq::subscribe(type, location, streamTable)

详情

表示对上海证券交易所或深圳证券交易所发布的某种行情数据进行订阅,并将结果保存到由参数 streamTable 指定的流表中。

订阅成功后,在日志(dolphindb.log)中会有打印如下信息(若出现 successfully,表则示订阅成功):

OnRspSecuTransactionSubscribe: nRequestID[0], ErrorID[0], ErrorMsg[subscribe all transaction trans type[1] of exchange_id [1] successfully] 

请注意,若需要将已经订阅的同一个(type, location) 的行情数据输出到另一个 streamTable,需要通过 unscribeTable 命令取消订阅,否则会抛出异常。

streamTable(流表)是一种特殊的内存表,用于存储及发布流数据。更多流表的使用方法可参考文档:DolphinDB-流数据介绍

参数

type 一个字符串,表示行情的类型,包含以下值:

  • "snapshot" 表示回调函数 OnRtnSecuDepthMarketData(主推 - 现货深度行情)获取的行情数据。
  • "trade" 表示回调函数 OnRtnSecuTransactionTradeData(主推 - 现货逐笔成交行情主)获取的行情数据。
  • "orders" 表示回调函数 OnRtnSecuTransactionEntrustData(主推 - 现货逐笔委托行情)获取的行情数据。

**location **一个字符串,表示上海证券交易所或深圳证券交易所。上海证券交易所用 sh 表示,深圳证券交易所用 sz 表示。

streamTable 表示一个共享流表的表对象。订阅前需要创建一个流表,且该流表的 schema 需要和获取的行情数据结构一致。注意,建议设置为一个持久化后的流表对象(参见 enableTableShareAndPersistenceenableTablePersistence)。否则,可能会发生 OOM。

unsubscribe

语法

nsq::unsubscribe(type, location)

详情

表示取消对上海证券交易所或深圳证券交易所发布的某种行情数据的订阅,例如:unsubscribe(`snapshot, \`sz) 表示取消对深圳证券交易所的 snapshot 行情数据的订阅。

取消订阅成功后,在日志 (dolphindb.log) 中会有打印如下信息(若出现 successfully,表示取消订阅成功):

OnRspSecuTransactionCancel: nRequestID[0], ErrorID[0], ErrorMsg[unsubscribe all transaction trans type [2] of exchange_id [2] successfully]

参数

subscribe 一致。

close

语法

nsq::close()

详情

表示断开当前连接。

getSubscriptionStatus

语法

nsq::getSubscriptionStatus()

详情

getSubscriptionStatus 是一个运维命令,用于获取当前连接状态,以及每个订阅的状态。

该函数会返回一个表,通过 select 语句来查看获取的状态。用法如下:

status = nsq::getSubscriptionStatus();
select * from status;

例如当前状态可能如下:

topicType     isConnected isSubscribed processedMsgCount lastErrMsg failedMsgCount lastFailedTimestamp
-------------- ----------- ------------ ----------------- ---------- -------------- -------------------
(snapshot, sh) true        true         0                            0
(snapshot, sz) true        true         0                            0
(trade, sh)    true        true         0                            0
(trade, sz)    true        true         0                            0
(orders, sh)    true        true         0                            0
(orders, sz)    true        true         0                            0

3. 完整示例

// 登录
login("admin", "123456")

// 加载插件
loadPlugin("Your_plugin_path/PluginNsq.txt");

// 连接行情服务器,第二个参数为可选
nsq::connect(your_config_path,dict(["ReceivedTime", "getAllFieldNames"], [true, true]));

// 获取行情数据的表结构
snapshotSchema = nsq::getSchema(`snapshot);
tradeSchema = nsq::getSchema(`trade);

// 根据表结构创建流表
streamTable(1000:0, snapshotSchema[`name], snapshotSchema[`type]) as t1;
streamTable(1000:0, tradeSchema[`name], tradeSchema[`type]) as t2;
go

// 流表持久化
enableTableShareAndPersistence(table=t1, tableName=`snapshot_sh, cacheSize=100000)
enableTableShareAndPersistence(table=t2, tableName=`trade_sh, cacheSize=100000)

// 订阅上海证券交易所的深度行情s
nsq::subscribe(`snapshot, `sh, snapshot_sh);

// 取消订阅
nsq::unsubscribe(`snapshot, `sh)

// 订阅上海证券交易所的逐笔成交行情
nsq::subscribe(`trade`, `sh`, trade_sh);

// 用这个表对象进行操作
select * from snapshot_sh limit 100;

// 取消订阅
nsq::unsubscribe(`trade`, `sh`)

// 获取每个订阅的状态
status = nsq::getSubscriptionStatus();
select * from status;

// 关闭连接
nsq::close();

4. 附录

报错信息

插件正常运行的信息会打印在日志文件中(dolphindb.log),若运行中出现错误,则会抛出异常。具体异常信息及解决办法如下:

  1. 重复连接异常。若当前已连接,则需要先通过 close 关闭连接,再 connect 重连。

    You are already connected. To reconnect, please execute close() and try again.

  2. API 初始化错误,需要确认 connect 传入的配置文件路径和配置信息是否正确。

    Initialization failed. Please check the config file path and the configuration.

  3. API 连接服务器失败,需要确认 connect 传入的配置文件路径和配置信息是否正确。

    Failed to connect to server. Please check the config file path and the configuration.

  4. 登录错误,用户名,密码错误。

    login failed: iRet , error:

  5. API 未初始化错误,需要检查是否 connect() 成功。

    API is not initialized. Please check whether the connection is set up via connect().

  6. subscribe 的 streamTable 参数错误,需要是一个 shared streamTable(共享流表)。

    The third parameter "streamTable" must be a shared stream table.

  7. subscribe 的 location 参数错误,需要是 shsz

    The second parameter "location" must be sh or sz.

  8. subscribe 的 type 参数错误,应该是 snapshot or trade or orders

    The first parameter "type" must be snapshot, trade or orders.

  9. subscribe streamTable 参数的 schema 错误,schema 需和 SDK 一致。

    Subscription failed. Please check if the schema of “streamTable” is correct.

  10. 重复订阅错误,想要更换同一类订阅 (如 snapshot, sh 两个字段唯一标识一类订阅) 订阅的流表,需要先执行 unsubscribe,然后再更新订阅。

    Subscription already exists. To update subscription, call unsubscribe() and try again.

  11. unsubscribe 时 API 未初始化错误。

    API is not initialized. Please check whether the connection is set up via connect().

  12. close() 错误,在未初始化(未调用 connect)的 API 上进行了 close

    Failed to close(). There is no connection to close.