login(`admin, `123456) pluginPath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/PluginNsq.txt" configFilePath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/nsq_sdk_config.ini"; try { loadPlugin("plugins/nsq/PluginNsq.txt") } catch(ex) { print(ex) try{ loadPlugin(pluginPath) } catch(ex) { print(ex) } } // 调用模块 use DolphinDBModules::easyNSQ /** 例1 接收NSQ深交所Level2快照实时行情数据,不做持久化 */ // 初始化化境并拉起订阅 iniNsqEnv() streamTableNames = subscribeNsq(configFilePath, "snapshot", "sz", saveToDfs=false) // 检查订阅情况 nsq::getSubscriptionStatus() select count(*) from objByName(streamTableNames[0]) select top 100 * from objByName(streamTableNames[0]) // 停止订阅 nsq::unsubscribe("snapshot", "sz") nsq::getSubscriptionStatus() /** 例2 接收NSQ上交所所有类型的实时行情数据,并持久化到分区表 */ // 初始化环境并拉起订阅 iniNsqEnv() iniNsqDfs() subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true) subscribeNsq(configFilePath, "trade", "sh", saveToDfs=true) subscribeNsq(configFilePath, "snapshot", "sh", saveToDfs=true) // 检查订阅情况 nsq::getSubscriptionStatus() existsSubscriptionTopic(,"nsqStockOrdersSHStream","easyNSQ_saveToDfsTable") existsSubscriptionTopic(,"nsqStockTradeSHStream","easyNSQ_saveToDfsTable") existsSubscriptionTopic(,"nsqStockSnapshotSHStream","easyNSQ_saveToDfsTable") select count(*) from objByName("nsqStockOrdersSHStream") select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH") select count(*) from objByName("nsqStockTradeSHStream") select count(*) from loadTable("dfs://nsqStockTrade", "tradeSH") select count(*) from objByName("nsqStockSnapshotSHStream") select count(*) from loadTable("dfs://nsqStockSnapshot", "snapshotSH") // 仅停止orders行情数据的订阅 nsq::unsubscribe("orders", "sh") nsq::getSubscriptionStatus() // 停止所有订阅 closeNsqConnection() nsq::getSubscriptionStatus() /** 例3.1 停止例2中的订阅后,重新接收上海市场orders数据,不清除之前订阅持久化到分区表的行情数据 */ // 初始化流环境并拉起订阅 iniNsqEnv("nsqStockOrdersSHStream") streamTableNames, dbPath, tableNames = subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true) // 检查订阅情况 nsq::getSubscriptionStatus() existsSubscriptionTopic(,streamTableNames[0],"easyNSQ_saveToDfsTable") select count(*) from objByName(streamTableNames[0]) select count(*) from loadTable(dbPath, tableNames[0]) // 停止订阅 nsq::unsubscribe("orders", "sh") /** 例3.1 停止例2中的订阅后,重新接收上海市场orders数据,但是清除之前订阅持久化到分区表的行情数据 */ // 初始化环境并拉起订阅 iniNsqEnv("nsqStockOrdersSHStream") iniNsqDfs("dfs://nsqStockOrders", "ordersSH") subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true) // 检查订阅情况 nsq::getSubscriptionStatus() existsSubscriptionTopic(,"nsqStockOrdersSHStream","easyNSQ_saveToDfsTable") select count(*) from objByName("nsqStockOrdersSHStream") select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH") // 停止订阅 nsq::unsubscribe("orders", "sh") /** 例4.1 接收NSQ上交所和深交所的实时行情数据,并持久化到分区表,合并沪深数据 */ // 初始化环境并拉起订阅 iniNsqEnv() iniNsqDfs() subscribeNsq(configFilePath, "orders", ["sh","sz"], merge=true, saveToDfs=true) subscribeNsq(configFilePath, "trade", ["sh","sz"], merge=true, saveToDfs=true) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], merge=true, saveToDfs=true) // 检查订阅情况 nsq::getSubscriptionStatus() existsSubscriptionTopic(,"nsqStockOrdersStream","easyNSQ_saveToDfsTable") existsSubscriptionTopic(,"nsqStockTradeStream","easyNSQ_saveToDfsTable") existsSubscriptionTopic(,"nsqStockSnapshotStream","easyNSQ_saveToDfsTable") // 停止订阅 closeNsqConnection() /** 例4.2 接收NSQ上交所和深交所的实时行情数据,并持久化到分区表,沪深数据分开(用户指定流表和分区表的名字) */ // 初始化环境并拉起订阅 iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"]) iniNsqDfs("dfs://myNsqOrders", `myNsqOrdersSH`myNsqOrdersSZ) iniNsqDfs("dfs://myNsqTrade", `myNsqTradeSH`myNsqTradeSZ) iniNsqDfs("dfs://myNsqSnapshot", `myNsqSnapshotSH`myNsqSnapshotSZ) subscribeNsq(configFilePath, "orders", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqOrdersSHStream", "myNsqOrdersSZStream"], dbPath="dfs://myNsqOrders", tableNames=["myNsqOrdersSH", "myNsqOrdersSZ"]) subscribeNsq(configFilePath, "trade", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqTradeSHStream", "myNsqTradeSZStream"], dbPath="dfs://myNsqTrade", tableNames=["myNsqTradeSH", "myNsqTradeSZ"]) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqSnapshotSHStream", "myNsqSnapshotSZStream"], dbPath="dfs://myNsqSnapshot", tableNames=["myNsqSnapshotSH", "myNsqSnapshotSZ"]) // 检查订阅情况 nsq::getSubscriptionStatus() select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // 停止订阅 closeNsqConnection() /** 清理之前例子运行时遗留的流表和分区表 */ iniNsqEnv() iniNsqDfs() iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"]) iniNsqDfs("dfs://myNsqOrders", `myNsqOrdersSH`myNsqOrdersSZ) iniNsqDfs("dfs://myNsqTrade", `myNsqTradeSH`myNsqTradeSZ) iniNsqDfs("dfs://myNsqSnapshot", `myNsqSnapshotSH`myNsqSnapshotSZ) /** 例5 接收上海和深证两个市场的 NSQ 行情数据、持久化存储,并对行情数据分别做沪深合并和沪深分开两种处理的startup.dos */ login(`admin, `123456) go pluginPath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/PluginNsq.txt" configFilePath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/nsq_sdk_config.ini"; // 确保 nsq 插件加载成功 try{ loadPlugin(pluginPath) } catch(ex) { print(ex) } go // 调用模块 use DolphinDBModules::easyNSQ go // 初始化环境(不删除分区表) iniNsqEnv() go // 拉起订阅 subscribeNsq(configFilePath, "orders", ["sh","sz"], saveToDfs=true) subscribeNsq(configFilePath, "trade", ["sh","sz"], saveToDfs=true) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], saveToDfs=true) subscribeNsq(configFilePath, "orders", ["sh","sz"], merge=true, saveToDfs=true) subscribeNsq(configFilePath, "trade", ["sh","sz"], merge=true, saveToDfs=true) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], merge=true, saveToDfs=true)