// 鐧诲綍鏁版嵁搴� login(`admin, `123456) /* // 浠庢彃浠跺競鍦轰笅杞藉畨瑁� nsq 鎻掍欢 listRemotePlugins("nsq") installPlugin("nsq") */ // 鍔犺浇鎻掍欢 try{ loadPlugin("nsq") } catch(ex) { print(ex) } go // 璋冪敤妯″潡 use DolphinDBModules::easyNSQ // nsq 琛屾儏閰嶇疆鏂囦欢璺緞 configFilePath = "<your_path_to>/nsq_sdk_config.ini"; // nsq 璐﹀彿锛堥潪蹇呭~锛� nsq_username = "<your_nsq_username>"; nsq_password = "<your_nsq_password>"; // 鏁版嵁鎺ユ敹閫夐」锛堥潪蹇呭~锛� nsq_data_option = dict(STRING, ANY) nsq_data_option["receivedTime"]=true // 鍦ㄨ鎯呮暟鎹腑澧炲姞涓€鍒楁帴鏀舵椂闂� nsq_data_option["getAllFieldNames"]=true // 鎺ュ彈 nsq 鍘熷琛屾儏涓墍鏈夊瓧娈� /* configFilePath = "/home/appadmin/mqzhu/ddb_20011/server/uploads/sdk_config.ini"; nsq_username = ""; nsq_password = ""; */ /** 渚�1 浠呬粠NSQ鎺ユ敹娣卞湷甯傚満snapshot绫诲瀷鐨勫疄鏃惰鎯呮暟鎹埌娴佹暟鎹〃锛屼笉瀛樺偍鍒板垎鍖鸿〃锛堟祦琛ㄤ娇鐢ㄦā鍧楁彁渚涚殑榛樿鍚嶅瓧锛� */ // 鍒濆鍖栨祦鐜锛堟竻鐞嗘墍鏈夌浉鍏虫祦琛ㄥ強鍏惰闃咃級 iniNsqEnv() // 鎷夎捣璁㈤槄 streamTableNames = subscribeNsq(configFilePath, "snapshot", "sz", options=nsq_data_option) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() // select count(*) from objByName(streamTableNames[0]) // select top 100 * from objByName(streamTableNames[0]) // 鍋滄璁㈤槄 nsq::unsubscribe("snapshot", "sz") nsq::getSubscriptionStatus() /** 渚�2 浠嶯SQ鎺ユ敹涓婃捣甯傚満鐨勫疄鏃惰鎯呮暟鎹埌娴佹暟鎹〃锛屽苟鍦ㄥ垎鍖鸿〃涓寔涔呭寲瀛樺偍锛堟祦琛ㄥ拰鍒嗗尯琛ㄤ娇鐢ㄦā鍧楁彁渚涚殑榛樿鍚嶅瓧锛� */ // 鍒濆鍖栨墍鏈夌浉鍏崇殑娴佺幆澧冨拰鍒嗗尯琛� iniNsqEnv() iniNsqDfs() // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤 // 璁㈤槄涓婃捣甯傚満orders,trade,snapshot琛屾儏鏁版嵁锛屽苟鎸佷箙鍖栧瓨鍌� subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option) subscribeNsq(configFilePath, "trade", "sh", saveToDfs=true, options=nsq_data_option) subscribeNsq(configFilePath, "snapshot", "sh", saveToDfs=true, options=nsq_data_option) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // select count(*) from objByName("nsqStockOrdersSHStream") // select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH") // select count(*) from objByName("nsqStockTradeSHStream") // select count(*) from loadTable("dfs://nsqStockTrade", "tradeSH") // select count(*) from objByName("nsqStockSnapshotSHStream") // select count(*) from loadTable("dfs://nsqStockSnapshot", "snapshotSH") // 浠呭仠姝rders琛屾儏鏁版嵁鐨勮闃� nsq::unsubscribe("orders", "sh") nsq::getSubscriptionStatus() // 鍏抽棴涓巒sq鐨勮繛鎺ワ紝骞跺仠姝㈡墍鏈夎闃� closeNsqConnection() /** 渚�3.1 鍋滄渚�2涓殑璁㈤槄鍚庯紝閲嶆柊鎺ユ敹涓婃捣甯傚満orders鏁版嵁锛屼繚鐣欎箣鍓嶈闃呮寔涔呭寲鍒板垎鍖鸿〃鐨勬暟鎹� */ // 鍒濆鍖栨祦鐜锛屼粎娓呯悊 nsqStockOrdersSHStream 娴佽〃 iniNsqEnv("nsqStockOrdersSHStream") // 璁㈤槄涓婃捣甯傚満orders琛屾儏鏁版嵁锛屽苟鎸佷箙鍖栧瓨鍌� streamTableNames, dbPath, tableNames = subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // select count(*) from objByName(streamTableNames[0]) // select count(*) from loadTable(dbPath, tableNames[0]) // 鍋滄璁㈤槄 nsq::unsubscribe("orders", "sh") nsq::getSubscriptionStatus() /** 渚�3.2 鍋滄渚�2涓殑璁㈤槄鍚庯紝閲嶆柊鎺ユ敹涓婃捣甯傚満orders鏁版嵁锛屼笖涓嶄繚鐣欎箣鍓嶈闃呮寔涔呭寲鍒板垎鍖鸿〃鐨勬暟鎹� */ // 鍒濆鍖栨祦鐜鍜屽垎甯冨紡琛� iniNsqEnv("nsqStockOrdersSHStream") iniNsqDfs("dfs://nsqStockOrders", "ordersSH") // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤 // 璁㈤槄涓婃捣甯傚満orders琛屾儏鏁版嵁锛屽苟鎸佷箙鍖栧瓨鍌� subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // select count(*) from objByName("nsqStockOrdersSHStream") // select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH") // 鍋滄璁㈤槄 nsq::unsubscribe("orders", "sh") nsq::getSubscriptionStatus() /** 渚�4.1 浠嶯SQ鎺ユ敹涓婃捣鍜屾繁鍦冲競鍦虹殑瀹炴椂琛屾儏鏁版嵁鍒版祦鏁版嵁琛紝骞跺湪鍒嗗尯琛ㄤ腑鎸佷箙鍖栧瓨鍌紝涓婃捣甯傚満鍜屾繁鍦冲競鍦虹殑鏁版嵁鍚堝苟澶勭悊锛堟祦琛ㄥ拰鍒嗗尯琛ㄤ娇鐢ㄦā鍧楁彁渚涚殑榛樿鍚嶅瓧锛� */ // 鍒濆鍖栨祦鐜鍜屽垎甯冨紡琛� iniNsqEnv() iniNsqDfs() // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤 // 璁㈤槄涓婃捣鍜屾繁鍦冲競鍦簅rders,trade,snapshot琛屾儏鏁版嵁,骞舵寔涔呭寲瀛樺偍 subscribeNsq(configFilePath, "orders", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "trade", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // 鍋滄璁㈤槄 closeNsqConnection() // 娓呯悊娴佽〃鍜屽垎鍖鸿〃 iniNsqEnv() iniNsqDfs() // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤 /** 渚�4.2 浠嶯SQ鎺ユ敹涓婃捣鍜屾繁鍦冲競鍦虹殑瀹炴椂琛屾儏鏁版嵁鍒版祦鏁版嵁琛紝骞跺湪鍒嗗尯琛ㄤ腑鎸佷箙鍖栧瓨鍌紝涓婃捣甯傚満鍜屾繁鍦冲競鍦虹殑鏁版嵁鍒嗗紑澶勭悊锛堟祦琛ㄥ拰鍒嗗尯琛ㄤ娇鐢ㄨ嚜瀹氫箟鍚嶅瓧锛� */ // 鍒濆鍖栨祦鐜鍜屽垎甯冨紡琛� iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"]) each(iniNsqDfs{"dfs://myNsqOrders"}, `myNsqOrdersSH`myNsqOrdersSZ) // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤 each(iniNsqDfs{"dfs://myNsqTrade"}, `myNsqTradeSH`myNsqTradeSZ) // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤 each(iniNsqDfs{"dfs://myNsqSnapshot"}, `myNsqSnapshotSH`myNsqSnapshotSZ) // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤 // 璁㈤槄涓婃捣鍜屾繁鍦冲競鍦簅rders,trade,snapshot琛屾儏鏁版嵁,骞舵寔涔呭寲瀛樺偍 subscribeNsq(configFilePath, "orders", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqOrdersSHStream", "myNsqOrdersSZStream"], dbPath="dfs://myNsqOrders", tableNames=["myNsqOrdersSH", "myNsqOrdersSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "trade", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqTradeSHStream", "myNsqTradeSZStream"], dbPath="dfs://myNsqTrade", tableNames=["myNsqTradeSH", "myNsqTradeSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password) subscribeNsq(configFilePath, "snapshot", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqSnapshotSHStream", "myNsqSnapshotSZStream"], dbPath="dfs://myNsqSnapshot", tableNames=["myNsqSnapshotSH", "myNsqSnapshotSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password) // 妫€鏌ヨ闃呮儏鍐� nsq::getSubscriptionStatus() // select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%" // 鍋滄璁㈤槄 closeNsqConnection() // 娓呯悊娴佽〃鍜屽垎鍖鸿〃 iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"]) each(iniNsqDfs{"dfs://myNsqOrders"}, `myNsqOrdersSH`myNsqOrdersSZ) // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤 each(iniNsqDfs{"dfs://myNsqTrade"}, `myNsqTradeSH`myNsqTradeSZ) // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤 each(iniNsqDfs{"dfs://myNsqSnapshot"}, `myNsqSnapshotSH`myNsqSnapshotSZ) // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤