login(`admin, `123456) pluginPath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/PluginNsq.txt" configFilePath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/nsq_sdk_config.ini"; try { loadPlugin("plugins/nsq/PluginNsq.txt") } catch(ex) { print(ex) try{ loadPlugin(pluginPath) } catch(ex) { print(ex) } } // 璋冪敤妯″潡 use DolphinDBModules::easyNSQ /** 渚�1 鎺ユ敹NSQ娣变氦鎵€Level2蹇収瀹炴椂琛屾儏鏁版嵁锛屼笉鍋氭寔涔呭寲 */ // 鍒濆鍖栧寲澧冨苟鎷夎捣璁㈤槄 iniNsqEnv() streamTableNames = subscribeNsq(configFilePath, "snapshot", "sz", saveToDfs=false) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() select count(*) from objByName(streamTableNames[0]) select top 100 * from objByName(streamTableNames[0]) // 鍋滄璁㈤槄 nsq::unsubscribe("snapshot", "sz") nsq::getSubscriptionStatus() /** 渚�2 鎺ユ敹NSQ涓婁氦鎵€鎵€鏈夌被鍨嬬殑瀹炴椂琛屾儏鏁版嵁锛屽苟鎸佷箙鍖栧埌鍒嗗尯琛� */ // 鍒濆鍖栫幆澧冨苟鎷夎捣璁㈤槄 iniNsqEnv() iniNsqDfs() subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true) subscribeNsq(configFilePath, "trade", "sh", saveToDfs=true) subscribeNsq(configFilePath, "snapshot", "sh", saveToDfs=true) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() existsSubscriptionTopic(,"nsqStockOrdersSHStream","easyNSQ_saveToDfsTable") existsSubscriptionTopic(,"nsqStockTradeSHStream","easyNSQ_saveToDfsTable") existsSubscriptionTopic(,"nsqStockSnapshotSHStream","easyNSQ_saveToDfsTable") select count(*) from objByName("nsqStockOrdersSHStream") select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH") select count(*) from objByName("nsqStockTradeSHStream") select count(*) from loadTable("dfs://nsqStockTrade", "tradeSH") select count(*) from objByName("nsqStockSnapshotSHStream") select count(*) from loadTable("dfs://nsqStockSnapshot", "snapshotSH") // 浠呭仠姝rders琛屾儏鏁版嵁鐨勮闃� nsq::unsubscribe("orders", "sh") nsq::getSubscriptionStatus() // 鍋滄鎵€鏈夎闃� closeNsqConnection() nsq::getSubscriptionStatus() /** 渚�3.1 鍋滄渚�2涓殑璁㈤槄鍚庯紝閲嶆柊鎺ユ敹涓婃捣甯傚満orders鏁版嵁锛屼笉娓呴櫎涔嬪墠璁㈤槄鎸佷箙鍖栧埌鍒嗗尯琛ㄧ殑琛屾儏鏁版嵁 */ // 鍒濆鍖栨祦鐜骞舵媺璧疯闃� iniNsqEnv("nsqStockOrdersSHStream") streamTableNames, dbPath, tableNames = subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() existsSubscriptionTopic(,streamTableNames[0],"easyNSQ_saveToDfsTable") select count(*) from objByName(streamTableNames[0]) select count(*) from loadTable(dbPath, tableNames[0]) // 鍋滄璁㈤槄 nsq::unsubscribe("orders", "sh") /** 渚�3.1 鍋滄渚�2涓殑璁㈤槄鍚庯紝閲嶆柊鎺ユ敹涓婃捣甯傚満orders鏁版嵁锛屼絾鏄竻闄や箣鍓嶈闃呮寔涔呭寲鍒板垎鍖鸿〃鐨勮鎯呮暟鎹� */ // 鍒濆鍖栫幆澧冨苟鎷夎捣璁㈤槄 iniNsqEnv("nsqStockOrdersSHStream") iniNsqDfs("dfs://nsqStockOrders", "ordersSH") subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() existsSubscriptionTopic(,"nsqStockOrdersSHStream","easyNSQ_saveToDfsTable") select count(*) from objByName("nsqStockOrdersSHStream") select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH") // 鍋滄璁㈤槄 nsq::unsubscribe("orders", "sh") /** 渚�4.1 鎺ユ敹NSQ涓婁氦鎵€鍜屾繁浜ゆ墍鐨勫疄鏃惰鎯呮暟鎹紝骞舵寔涔呭寲鍒板垎鍖鸿〃锛屽悎骞舵勃娣辨暟鎹� */ // 鍒濆鍖栫幆澧冨苟鎷夎捣璁㈤槄 iniNsqEnv() iniNsqDfs() subscribeNsq(configFilePath, "orders", ["sh","sz"], merge=true, saveToDfs=true) subscribeNsq(configFilePath, "trade", ["sh","sz"], merge=true, saveToDfs=true) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], merge=true, saveToDfs=true) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() existsSubscriptionTopic(,"nsqStockOrdersStream","easyNSQ_saveToDfsTable") existsSubscriptionTopic(,"nsqStockTradeStream","easyNSQ_saveToDfsTable") existsSubscriptionTopic(,"nsqStockSnapshotStream","easyNSQ_saveToDfsTable") // 鍋滄璁㈤槄 closeNsqConnection() /** 渚�4.2 鎺ユ敹NSQ涓婁氦鎵€鍜屾繁浜ゆ墍鐨勫疄鏃惰鎯呮暟鎹紝骞舵寔涔呭寲鍒板垎鍖鸿〃锛屾勃娣辨暟鎹垎寮€锛堢敤鎴锋寚瀹氭祦琛ㄥ拰鍒嗗尯琛ㄧ殑鍚嶅瓧锛� */ // 鍒濆鍖栫幆澧冨苟鎷夎捣璁㈤槄 iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"]) iniNsqDfs("dfs://myNsqOrders", `myNsqOrdersSH`myNsqOrdersSZ) iniNsqDfs("dfs://myNsqTrade", `myNsqTradeSH`myNsqTradeSZ) iniNsqDfs("dfs://myNsqSnapshot", `myNsqSnapshotSH`myNsqSnapshotSZ) subscribeNsq(configFilePath, "orders", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqOrdersSHStream", "myNsqOrdersSZStream"], dbPath="dfs://myNsqOrders", tableNames=["myNsqOrdersSH", "myNsqOrdersSZ"]) subscribeNsq(configFilePath, "trade", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqTradeSHStream", "myNsqTradeSZStream"], dbPath="dfs://myNsqTrade", tableNames=["myNsqTradeSH", "myNsqTradeSZ"]) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqSnapshotSHStream", "myNsqSnapshotSZStream"], dbPath="dfs://myNsqSnapshot", tableNames=["myNsqSnapshotSH", "myNsqSnapshotSZ"]) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // 鍋滄璁㈤槄 closeNsqConnection() /** 娓呯悊涔嬪墠渚嬪瓙杩愯鏃堕仐鐣欑殑娴佽〃鍜屽垎鍖鸿〃 */ iniNsqEnv() iniNsqDfs() iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"]) iniNsqDfs("dfs://myNsqOrders", `myNsqOrdersSH`myNsqOrdersSZ) iniNsqDfs("dfs://myNsqTrade", `myNsqTradeSH`myNsqTradeSZ) iniNsqDfs("dfs://myNsqSnapshot", `myNsqSnapshotSH`myNsqSnapshotSZ) /** 渚�5 鎺ユ敹涓婃捣鍜屾繁璇佷袱涓競鍦虹殑 NSQ 琛屾儏鏁版嵁銆佹寔涔呭寲瀛樺偍锛屽苟瀵硅鎯呮暟鎹垎鍒仛娌繁鍚堝苟鍜屾勃娣卞垎寮€涓ょ澶勭悊鐨剆tartup.dos */ login(`admin, `123456) go pluginPath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/PluginNsq.txt" configFilePath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/nsq_sdk_config.ini"; // 纭繚 nsq 鎻掍欢鍔犺浇鎴愬姛 try{ loadPlugin(pluginPath) } catch(ex) { print(ex) } go // 璋冪敤妯″潡 use DolphinDBModules::easyNSQ go // 鍒濆鍖栫幆澧�(涓嶅垹闄ゅ垎鍖鸿〃) iniNsqEnv() go // 鎷夎捣璁㈤槄 subscribeNsq(configFilePath, "orders", ["sh","sz"], saveToDfs=true) subscribeNsq(configFilePath, "trade", ["sh","sz"], saveToDfs=true) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], saveToDfs=true) subscribeNsq(configFilePath, "orders", ["sh","sz"], merge=true, saveToDfs=true) subscribeNsq(configFilePath, "trade", ["sh","sz"], merge=true, saveToDfs=true) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], merge=true, saveToDfs=true)