NSQ
为对接恒生 NSQ 极速行情服务软件,DolphinDB 基于恒生 HSNsqApi 开发了 NSQ 插件。通过该插件能够获取上海和深圳市场的行情。主要获得以下三种行情:
- 主推-现货深度行情主推回调(OnRtnSecuDepthMarketData->snapshot)
- 主推-现货逐笔成交行情主推回调(OnRtnSecuTransactionTradeData->trade)
- 主推-现货逐笔委托行情主推回调(OnRtnSecuTransactionEntrustData->orders)
请注意,DolphinDB NSQ 插件仅提供行情对接服务。数据源和接入服务可咨询数据服务商或证券公司。
安装插件
版本要求
DolphinDB Server 2.00.10 及更高版本,支持 Linux x86-64, Windows x86-64。
安装步骤
在 DolphinDB 客户端中使用
listRemotePlugins
命令查看插件仓库中的插件信息。注意:仅展示当前操作系统和 server 版本支持的插件。若无预期插件,可自行编译(请选择对应分支下的插件)或在 DolphinDB 用户社区进行反馈。
login("admin", "123456"); listRemotePlugins()
使用
installPlugin
命令完成插件安装。installPlugin("nsq");
使用
loadPlugin
命令加载插件。loadPlugin("nsq");
接口函数
connect
语法
nsq::connect(configFilePath, [options], [username], [password], [dataVersion])
详情
该函数将根据 NSQ 配置文件 sdk_config.ini 、传入的用户名和密码,与行情服务器建立连接。连接成功后,将在日志文件 dolphindb.log 中会打印 “OnFrontConnected”。
注意:
- 部分 NSQ 的行情服务器会强制要求用户传入用户名密码,请注意填写。
- 再次执行
connect
进行重新连接前,需要先执行nsq::close()
断开连接,否则会抛出异常。 - 配置文件中 sailfish 源必须指定 service_addr 和 service_port 两项,否则会直接报错。
- 如果配置文件的编码格式有误,也可能会导致无法找到上一条中所说的必备项,建议使用 UTF-8 格式编码的配置文件。
- 目前插件支持连接 sailfish,DS 解码,目前不支持其他源如 FPGA 解码。如果填写了不支持的源则会在连接时宕机,需要谨慎处理。
- 连接 Windows 版本的 NSQ 时,配置文件需要使用 CRLF 换行符;直接使用 UNIX 风格的换行方式可能会导致系统卡住。
参数
configFilePath 字符串标量,表示 sdk_config.ini 的绝对路径;若拷贝 sdk_config.ini 至 dolphindb server,则可以是相对于 DolphinDB Server 的一个相对路径。
options 字典类型,可选参数,表示扩展参数。当前键只支持 receivedTime 和 getAllFieldNames。
- receivedTime 表示是否显示接收时间,对应值为布尔值。
- getAllFieldNames 表示是否接受所有字段数据,对应值为布尔值。详见后文示例。
username 字符串类型,可选参数,表示登录的用户名。
password 字符串类型,可选参数,表示登录用户名对应的密码。
dataVersion 字符串标量,可选参数,表示接收数据时字段内容所基于的 NSQ SDK 版本,可以填写 ‘ORIGIN’ 或者 ‘v220105’,默认为 ‘ORIGIN’。‘v220105’ 与 NSQ SDK 202201.05.000 版本对齐,其中 orders 类型会比 'ORIGIN' 版本多两个字段;当参数 options 中 getAllFieldNames 为 true 时,snapshot 类型也会增加深圳债券现券交易等字段。注意:指定该参数后,getSchema
和 subscribe
函数都会基于所指定的版本改变行为。
getSchema
语法
nsq::getSchema(dataType)
详情
该函数需要在 connect
函数之后调用。后续根据 getSchema
返回的表结构创建流表。
返回一个表,包含两列:name,type,分别表示该行情表各个字段的名称、类型名。可通过该表来创建具有相同结构的共享流表。
参数
dataType 一个字符串,表示所要获取的表结构的类型,支持填写 snapshot, trade, orders, orderTrade。
subscribe
语法
nsq::subscribe(type, location, streamTable)
详情
表示对上海证券交易所或深圳证券交易所发布的某种行情数据进行订阅,并将结果保存到由参数 streamTable 指定的流表中。
订阅成功后,在日志(dolphindb.log)中会有打印如下信息(若出现 successfully,表则示订阅成功):
OnRspSecuTransactionSubscribe: nRequestID[0], ErrorID[0], ErrorMsg[subscribe all transaction trans type[1] of exchange_id [1] successfully]
请注意,若需要将已经订阅的同一个(type, location) 的行情数据输出到另一个 streamTable,需要通过 unscribeTable
命令取消订阅,否则会抛出异常。
streamTable(流表)是一种特殊的内存表,用于存储及发布流数据。更多流表的使用方法可参考文档:DolphinDB-流数据介绍。
参数
type 一个字符串,表示行情的类型,可以指定为以下值:
- "snapshot" 表示回调函数 OnRtnSecuDepthMarketData(主推 - 现货深度行情)获取的行情数据。
- "trade" 表示回调函数 OnRtnSecuTransactionTradeData(主推 - 现货逐笔成交行情主)获取的行情数据。
- "orders" 表示回调函数 OnRtnSecuTransactionEntrustData(主推 - 现货逐笔委托行情)获取的行情数据。
- "orderTrade" 表示由 orders 和 trade 合并的行情数据,适配 orderbookSnapshotEngine 的输入。
location 一个字符串,表示上海证券交易所或深圳证券交易所。上海证券交易所用 sh
表示,深圳证券交易所用 sz
表示。
streamTable 表示一个共享流表的表对象。订阅前需要创建一个流表,且该流表的 schema 需要和获取的行情数据结构一致。注意,建议设置为一个持久化后的流表对象(参见 enableTableShareAndPersistence 或 enableTablePersistence)。否则,可能会发生 OOM。
unsubscribe
语法
nsq::unsubscribe(type, location)
详情
表示取消对上海证券交易所或深圳证券交易所发布的某种行情数据的订阅,例如:unsubscribe(`snapshot, \`sz) 表示取消对深圳证券交易所的 snapshot 行情数据的订阅。
取消订阅成功后,在日志 (dolphindb.log) 中会有打印如下信息(若出现 successfully,表示取消订阅成功):
OnRspSecuTransactionCancel: nRequestID[0], ErrorID[0], ErrorMsg[unsubscribe all transaction trans type [2] of exchange_id [2] successfully]
参数
同 subscribe
一致。
close
语法
nsq::close()
详情
表示断开当前连接。
getStatus
语法
nsq::getStatus()
详情
getStatus
是一个运维命令,用于获取当前连接状态,以及每个订阅的状态。在 3.00.0.2/2.00.12.5 之前版本,该函数名为 getSubscriptionStatus
,现已兼容。
该函数会返回一个表,通过 select 语句来查看获取的状态。用法如下:
status = nsq::getStatus();
select * from status;
例如当前状态可能如下:
topicType isConnected isSubscribed processedMsgCount lastErrMsg failedMsgCount lastFailedTimestamp
-------------- ----------- ------------ ----------------- ---------- -------------- -------------------
(snapshot, sh) true true 0 0
(snapshot, sz) true true 0 0
(trade, sh) true true 0 0
(trade, sz) true true 0 0
(orders, sh) true true 0 0
(orders, sz) true true 0 0
完整示例
// 登录
login("admin", "123456")
// 加载插件
loadPlugin("Your_plugin_path/PluginNsq.txt");
// 连接行情服务器,第二个参数为可选
nsq::connect(your_config_path,dict(["ReceivedTime", "getAllFieldNames"], [true, true]));
// 获取行情数据的表结构
snapshotSchema = nsq::getSchema(`snapshot);
tradeSchema = nsq::getSchema(`trade);
// 根据表结构创建流表
streamTable(1000:0, snapshotSchema[`name], snapshotSchema[`type]) as t1;
streamTable(1000:0, tradeSchema[`name], tradeSchema[`type]) as t2;
go
// 流表持久化
enableTableShareAndPersistence(table=t1, tableName=`snapshot_sh, cacheSize=100000)
enableTableShareAndPersistence(table=t2, tableName=`trade_sh, cacheSize=100000)
// 订阅上海证券交易所的深度行情s
nsq::subscribe(`snapshot, `sh, snapshot_sh);
// 取消订阅
nsq::unsubscribe(`snapshot, `sh)
// 订阅上海证券交易所的逐笔成交行情
nsq::subscribe(`trade`, `sh`, trade_sh);
// 用这个表对象进行操作
select * from snapshot_sh limit 100;
// 取消订阅
nsq::unsubscribe(`trade`, `sh`)
// 获取每个订阅的状态
status = nsq::getSubscriptionStatus();
select * from status;
// 关闭连接
nsq::close();
附录
报错信息
插件正常运行的信息会打印在日志文件中(dolphindb.log),若运行中出现错误,则会抛出异常。具体异常信息及解决办法如下:
重复连接异常。若当前已连接,则需要先通过
close
关闭连接,再connect
重连。You are already connected. To reconnect, please execute close() and try again.
API 初始化错误,需要确认
connect
传入的配置文件路径和配置信息是否正确。Initialization failed. Please check the config file path and the configuration.
API 连接服务器失败,需要确认
connect
传入的配置文件路径和配置信息是否正确。Failed to connect to server. Please check the config file path and the configuration.
登录错误,用户名,密码错误。
login failed: iRet [iRet], error: [errorMsg]
API 未初始化错误,需要检查是否
connect()
成功。API is not initialized. Please check whether the connection is set up via connect().
subscribe 的
streamTable
参数错误,需要是一个 shared streamTable(共享流表)。The third parameter "streamTable" must be a shared stream table.
subscribe 的
location
参数错误,需要是sh
或sz
。The second parameter "location" must be
sh
orsz
.subscribe 的
type
参数错误,应该是snapshot
ortrade
ororders
。The first parameter "type" must be
snapshot
,trade
ororders
.subscribe
streamTable
参数的 schema 错误,schema 需和 SDK 一致。Subscription failed. Please check if the schema of “streamTable” is correct.
重复订阅错误,想要更换同一类订阅 (如
snapshot
,sh
两个字段唯一标识一类订阅) 订阅的流表,需要先执行unsubscribe
,然后再更新订阅。Subscription already exists. To update subscription, call unsubscribe() and try again.
unsubscribe
时 API 未初始化错误。API is not initialized. Please check whether the connection is set up via connect().
close()
错误,在未初始化(未调用connect
)的 API 上进行了close
。Failed to close(). There is no connection to close.