// 登录数据库 login(`admin, `123456) /* // 从插件市场下载安装 nsq 插件 listRemotePlugins("nsq") installPlugin("nsq") */ // 加载插件 try{ loadPlugin("nsq") } catch(ex) { print(ex) } go // 调用模块 use DolphinDBModules::easyNSQ // nsq 行情配置文件路径 configFilePath = "/nsq_sdk_config.ini"; // nsq 账号(非必填) nsq_username = ""; nsq_password = ""; // 数据接收选项(非必填) nsq_data_option = dict(STRING, ANY) nsq_data_option["receivedTime"]=true // 在行情数据中增加一列接收时间 nsq_data_option["getAllFieldNames"]=true // 接受 nsq 原始行情中所有字段 /* configFilePath = "/home/appadmin/mqzhu/ddb_20011/server/uploads/sdk_config.ini"; nsq_username = ""; nsq_password = ""; */ /** 例1 仅从NSQ接收深圳市场snapshot类型的实时行情数据到流数据表,不存储到分区表(流表使用模块提供的默认名字) */ // 初始化流环境(清理所有相关流表及其订阅) iniNsqEnv() // 拉起订阅 streamTableNames = subscribeNsq(configFilePath, "snapshot", "sz", options=nsq_data_option) // 检查订阅情况 nsq::getSubscriptionStatus() // select count(*) from objByName(streamTableNames[0]) // select top 100 * from objByName(streamTableNames[0]) // 停止订阅 nsq::unsubscribe("snapshot", "sz") nsq::getSubscriptionStatus() /** 例2 从NSQ接收上海市场的实时行情数据到流数据表,并在分区表中持久化存储(流表和分区表使用模块提供的默认名字) */ // 初始化所有相关的流环境和分区表 iniNsqEnv() iniNsqDfs() // 注意,该函数会删除数据库中的分区表,谨慎使用 // 订阅上海市场orders,trade,snapshot行情数据,并持久化存储 subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option) subscribeNsq(configFilePath, "trade", "sh", saveToDfs=true, options=nsq_data_option) subscribeNsq(configFilePath, "snapshot", "sh", saveToDfs=true, options=nsq_data_option) // 检查订阅情况 nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // select count(*) from objByName("nsqStockOrdersSHStream") // select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH") // select count(*) from objByName("nsqStockTradeSHStream") // select count(*) from loadTable("dfs://nsqStockTrade", "tradeSH") // select count(*) from objByName("nsqStockSnapshotSHStream") // select count(*) from loadTable("dfs://nsqStockSnapshot", "snapshotSH") // 仅停止orders行情数据的订阅 nsq::unsubscribe("orders", "sh") nsq::getSubscriptionStatus() // 关闭与nsq的连接,并停止所有订阅 closeNsqConnection() /** 例3.1 停止例2中的订阅后,重新接收上海市场orders数据,保留之前订阅持久化到分区表的数据 */ // 初始化流环境,仅清理 nsqStockOrdersSHStream 流表 iniNsqEnv("nsqStockOrdersSHStream") // 订阅上海市场orders行情数据,并持久化存储 streamTableNames, dbPath, tableNames = subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) // 检查订阅情况 nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // select count(*) from objByName(streamTableNames[0]) // select count(*) from loadTable(dbPath, tableNames[0]) // 停止订阅 nsq::unsubscribe("orders", "sh") nsq::getSubscriptionStatus() /** 例3.2 停止例2中的订阅后,重新接收上海市场orders数据,且不保留之前订阅持久化到分区表的数据 */ // 初始化流环境和分布式表 iniNsqEnv("nsqStockOrdersSHStream") iniNsqDfs("dfs://nsqStockOrders", "ordersSH") // 注意,该函数会删除数据库中的分区表,谨慎使用 // 订阅上海市场orders行情数据,并持久化存储 subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) // 检查订阅情况 nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // select count(*) from objByName("nsqStockOrdersSHStream") // select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH") // 停止订阅 nsq::unsubscribe("orders", "sh") nsq::getSubscriptionStatus() /** 例4.1 从NSQ接收上海和深圳市场的实时行情数据到流数据表,并在分区表中持久化存储,上海市场和深圳市场的数据合并处理(流表和分区表使用模块提供的默认名字) */ // 初始化流环境和分布式表 iniNsqEnv() iniNsqDfs() // 注意,该函数会删除数据库中的分区表,谨慎使用 // 订阅上海和深圳市场orders,trade,snapshot行情数据,并持久化存储 subscribeNsq(configFilePath, "orders", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "trade", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) // 检查订阅情况 nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // 停止订阅 closeNsqConnection() // 清理流表和分区表 iniNsqEnv() iniNsqDfs() // 注意,该函数会删除数据库中的分区表,谨慎使用 /** 例4.2 从NSQ接收上海和深圳市场的实时行情数据到流数据表,并在分区表中持久化存储,上海市场和深圳市场的数据分开处理(流表和分区表使用自定义名字) */ // 初始化流环境和分布式表 iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"]) each(iniNsqDfs{"dfs://myNsqOrders"}, `myNsqOrdersSH`myNsqOrdersSZ) // 注意,该函数会删除数据库中的分区表,谨慎使用 each(iniNsqDfs{"dfs://myNsqTrade"}, `myNsqTradeSH`myNsqTradeSZ) // 注意,该函数会删除数据库中的分区表,谨慎使用 each(iniNsqDfs{"dfs://myNsqSnapshot"}, `myNsqSnapshotSH`myNsqSnapshotSZ) // 注意,该函数会删除数据库中的分区表,谨慎使用 // 订阅上海和深圳市场orders,trade,snapshot行情数据,并持久化存储 subscribeNsq(configFilePath, "orders", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqOrdersSHStream", "myNsqOrdersSZStream"], dbPath="dfs://myNsqOrders", tableNames=["myNsqOrdersSH", "myNsqOrdersSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "trade", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqTradeSHStream", "myNsqTradeSZStream"], dbPath="dfs://myNsqTrade", tableNames=["myNsqTradeSH", "myNsqTradeSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqSnapshotSHStream", "myNsqSnapshotSZStream"], dbPath="dfs://myNsqSnapshot", tableNames=["myNsqSnapshotSH", "myNsqSnapshotSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password) // 检查订阅情况 nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // 停止订阅 closeNsqConnection() // 清理流表和分区表 iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"]) each(iniNsqDfs{"dfs://myNsqOrders"}, `myNsqOrdersSH`myNsqOrdersSZ) // 注意,该函数会删除数据库中的分区表,谨慎使用 each(iniNsqDfs{"dfs://myNsqTrade"}, `myNsqTradeSH`myNsqTradeSZ) // 注意,该函数会删除数据库中的分区表,谨慎使用 each(iniNsqDfs{"dfs://myNsqSnapshot"}, `myNsqSnapshotSH`myNsqSnapshotSZ) // 注意,该函数会删除数据库中的分区表,谨慎使用