// 鐧诲綍鏁版嵁搴�
login(`admin, `123456)

/*
// 浠庢彃浠跺競鍦轰笅杞藉畨瑁� nsq 鎻掍欢
listRemotePlugins("nsq")
installPlugin("nsq")
*/

// 鍔犺浇鎻掍欢
try{ loadPlugin("nsq") } catch(ex) { print(ex) }
go
// 璋冪敤妯″潡
use DolphinDBModules::easyNSQ


// nsq 琛屾儏閰嶇疆鏂囦欢璺緞
configFilePath = "<your_path_to>/nsq_sdk_config.ini";
// nsq 璐﹀彿锛堥潪蹇呭~锛�
nsq_username = "<your_nsq_username>";
nsq_password = "<your_nsq_password>";
// 鏁版嵁鎺ユ敹閫夐」锛堥潪蹇呭~锛�
nsq_data_option = dict(STRING, ANY)
nsq_data_option["receivedTime"]=true  // 鍦ㄨ鎯呮暟鎹腑澧炲姞涓€鍒楁帴鏀舵椂闂�
nsq_data_option["getAllFieldNames"]=true // 鎺ュ彈 nsq 鍘熷琛屾儏涓墍鏈夊瓧娈�

/*
configFilePath = "/home/appadmin/mqzhu/ddb_20011/server/uploads/sdk_config.ini";
nsq_username = "";
nsq_password = "";
*/


/** 渚�1 浠呬粠NSQ鎺ユ敹娣卞湷甯傚満snapshot绫诲瀷鐨勫疄鏃惰鎯呮暟鎹埌娴佹暟鎹〃锛屼笉瀛樺偍鍒板垎鍖鸿〃锛堟祦琛ㄤ娇鐢ㄦā鍧楁彁渚涚殑榛樿鍚嶅瓧锛� */

// 鍒濆鍖栨祦鐜锛堟竻鐞嗘墍鏈夌浉鍏虫祦琛ㄥ強鍏惰闃咃級
iniNsqEnv()

// 鎷夎捣璁㈤槄
streamTableNames = subscribeNsq(configFilePath, "snapshot", "sz", options=nsq_data_option)

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
// select count(*) from objByName(streamTableNames[0])
// select top 100 * from objByName(streamTableNames[0])

// 鍋滄璁㈤槄
nsq::unsubscribe("snapshot", "sz")
nsq::getSubscriptionStatus()

/** 渚�2 浠嶯SQ鎺ユ敹涓婃捣甯傚満鐨勫疄鏃惰鎯呮暟鎹埌娴佹暟鎹〃锛屽苟鍦ㄥ垎鍖鸿〃涓寔涔呭寲瀛樺偍锛堟祦琛ㄥ拰鍒嗗尯琛ㄤ娇鐢ㄦā鍧楁彁渚涚殑榛樿鍚嶅瓧锛� */ 

// 鍒濆鍖栨墍鏈夌浉鍏崇殑娴佺幆澧冨拰鍒嗗尯琛�
iniNsqEnv()
iniNsqDfs() // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤

// 璁㈤槄涓婃捣甯傚満orders,trade,snapshot琛屾儏鏁版嵁锛屽苟鎸佷箙鍖栧瓨鍌�
subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option)
subscribeNsq(configFilePath, "trade", "sh", saveToDfs=true, options=nsq_data_option)
subscribeNsq(configFilePath, "snapshot", "sh", saveToDfs=true, options=nsq_data_option)

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
// select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%"
// select count(*) from objByName("nsqStockOrdersSHStream")
// select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH")
// select count(*) from objByName("nsqStockTradeSHStream")
// select count(*) from loadTable("dfs://nsqStockTrade", "tradeSH")
// select count(*) from objByName("nsqStockSnapshotSHStream")
// select count(*) from loadTable("dfs://nsqStockSnapshot", "snapshotSH")

// 浠呭仠姝rders琛屾儏鏁版嵁鐨勮闃�
nsq::unsubscribe("orders", "sh")
nsq::getSubscriptionStatus()

// 鍏抽棴涓巒sq鐨勮繛鎺ワ紝骞跺仠姝㈡墍鏈夎闃�
closeNsqConnection()

/** 渚�3.1 鍋滄渚�2涓殑璁㈤槄鍚庯紝閲嶆柊鎺ユ敹涓婃捣甯傚満orders鏁版嵁锛屼繚鐣欎箣鍓嶈闃呮寔涔呭寲鍒板垎鍖鸿〃鐨勬暟鎹� */
// 鍒濆鍖栨祦鐜锛屼粎娓呯悊 nsqStockOrdersSHStream 娴佽〃
iniNsqEnv("nsqStockOrdersSHStream")

// 璁㈤槄涓婃捣甯傚満orders琛屾儏鏁版嵁锛屽苟鎸佷箙鍖栧瓨鍌�
streamTableNames, dbPath, tableNames = subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password)

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
// select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%"
// select count(*) from objByName(streamTableNames[0])
// select count(*) from loadTable(dbPath, tableNames[0])

// 鍋滄璁㈤槄
nsq::unsubscribe("orders", "sh")
nsq::getSubscriptionStatus()

/** 渚�3.2 鍋滄渚�2涓殑璁㈤槄鍚庯紝閲嶆柊鎺ユ敹涓婃捣甯傚満orders鏁版嵁锛屼笖涓嶄繚鐣欎箣鍓嶈闃呮寔涔呭寲鍒板垎鍖鸿〃鐨勬暟鎹� */

// 鍒濆鍖栨祦鐜鍜屽垎甯冨紡琛�
iniNsqEnv("nsqStockOrdersSHStream")
iniNsqDfs("dfs://nsqStockOrders", "ordersSH") // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤

// 璁㈤槄涓婃捣甯傚満orders琛屾儏鏁版嵁锛屽苟鎸佷箙鍖栧瓨鍌�
subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password)

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
// select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%"
// select count(*) from objByName("nsqStockOrdersSHStream")
// select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH")

// 鍋滄璁㈤槄
nsq::unsubscribe("orders", "sh")
nsq::getSubscriptionStatus()

/** 渚�4.1 浠嶯SQ鎺ユ敹涓婃捣鍜屾繁鍦冲競鍦虹殑瀹炴椂琛屾儏鏁版嵁鍒版祦鏁版嵁琛紝骞跺湪鍒嗗尯琛ㄤ腑鎸佷箙鍖栧瓨鍌紝涓婃捣甯傚満鍜屾繁鍦冲競鍦虹殑鏁版嵁鍚堝苟澶勭悊锛堟祦琛ㄥ拰鍒嗗尯琛ㄤ娇鐢ㄦā鍧楁彁渚涚殑榛樿鍚嶅瓧锛� */

// 鍒濆鍖栨祦鐜鍜屽垎甯冨紡琛�
iniNsqEnv()
iniNsqDfs() // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤

// 璁㈤槄涓婃捣鍜屾繁鍦冲競鍦簅rders,trade,snapshot琛屾儏鏁版嵁,骞舵寔涔呭寲瀛樺偍
subscribeNsq(configFilePath, "orders", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password)
subscribeNsq(configFilePath, "trade", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password)
subscribeNsq(configFilePath, "snapshot", ["sh","sz"], merge=true, saveToDfs=true, options=nsq_data_option, username=nsq_username, password=nsq_password)

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
// select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%"

// 鍋滄璁㈤槄
closeNsqConnection()

// 娓呯悊娴佽〃鍜屽垎鍖鸿〃
iniNsqEnv()
iniNsqDfs() // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤

/** 渚�4.2 浠嶯SQ鎺ユ敹涓婃捣鍜屾繁鍦冲競鍦虹殑瀹炴椂琛屾儏鏁版嵁鍒版祦鏁版嵁琛紝骞跺湪鍒嗗尯琛ㄤ腑鎸佷箙鍖栧瓨鍌紝涓婃捣甯傚満鍜屾繁鍦冲競鍦虹殑鏁版嵁鍒嗗紑澶勭悊锛堟祦琛ㄥ拰鍒嗗尯琛ㄤ娇鐢ㄨ嚜瀹氫箟鍚嶅瓧锛� */

// 鍒濆鍖栨祦鐜鍜屽垎甯冨紡琛�
iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"])
each(iniNsqDfs{"dfs://myNsqOrders"}, `myNsqOrdersSH`myNsqOrdersSZ)  // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤
each(iniNsqDfs{"dfs://myNsqTrade"}, `myNsqTradeSH`myNsqTradeSZ)  // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤
each(iniNsqDfs{"dfs://myNsqSnapshot"}, `myNsqSnapshotSH`myNsqSnapshotSZ)  // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤

// 璁㈤槄涓婃捣鍜屾繁鍦冲競鍦簅rders,trade,snapshot琛屾儏鏁版嵁,骞舵寔涔呭寲瀛樺偍
subscribeNsq(configFilePath, "orders", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqOrdersSHStream", "myNsqOrdersSZStream"], dbPath="dfs://myNsqOrders", tableNames=["myNsqOrdersSH", "myNsqOrdersSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password)
subscribeNsq(configFilePath, "trade", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqTradeSHStream", "myNsqTradeSZStream"], dbPath="dfs://myNsqTrade", tableNames=["myNsqTradeSH", "myNsqTradeSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password)
subscribeNsq(configFilePath, "snapshot", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqSnapshotSHStream", "myNsqSnapshotSZStream"], dbPath="dfs://myNsqSnapshot", tableNames=["myNsqSnapshotSH", "myNsqSnapshotSZ"], options=nsq_data_option, username=nsq_username, password=nsq_password)

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
// select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%"

// 鍋滄璁㈤槄
closeNsqConnection()

// 娓呯悊娴佽〃鍜屽垎鍖鸿〃
iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"])
each(iniNsqDfs{"dfs://myNsqOrders"}, `myNsqOrdersSH`myNsqOrdersSZ)  // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤
each(iniNsqDfs{"dfs://myNsqTrade"}, `myNsqTradeSH`myNsqTradeSZ)  // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤
each(iniNsqDfs{"dfs://myNsqSnapshot"}, `myNsqSnapshotSH`myNsqSnapshotSZ)  // 娉ㄦ剰锛岃鍑芥暟浼氬垹闄ゆ暟鎹簱涓殑鍒嗗尯琛紝璋ㄦ厧浣跨敤