login(`admin, `123456)
pluginPath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/PluginNsq.txt"
configFilePath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/nsq_sdk_config.ini";
try { 
    loadPlugin("plugins/nsq/PluginNsq.txt") 
} catch(ex) { 
    print(ex) 
    try{ loadPlugin(pluginPath) } catch(ex) { print(ex) }
}

// 璋冪敤妯″潡
use DolphinDBModules::easyNSQ

/** 渚�1 鎺ユ敹NSQ娣变氦鎵€Level2蹇収瀹炴椂琛屾儏鏁版嵁锛屼笉鍋氭寔涔呭寲 */

// 鍒濆鍖栧寲澧冨苟鎷夎捣璁㈤槄
iniNsqEnv()
streamTableNames = subscribeNsq(configFilePath, "snapshot", "sz", saveToDfs=false)

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
select count(*) from objByName(streamTableNames[0])
select top 100 * from objByName(streamTableNames[0])

// 鍋滄璁㈤槄
nsq::unsubscribe("snapshot", "sz")
nsq::getSubscriptionStatus()

/** 渚�2 鎺ユ敹NSQ涓婁氦鎵€鎵€鏈夌被鍨嬬殑瀹炴椂琛屾儏鏁版嵁锛屽苟鎸佷箙鍖栧埌鍒嗗尯琛� */ 

// 鍒濆鍖栫幆澧冨苟鎷夎捣璁㈤槄
iniNsqEnv()
iniNsqDfs()
subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true)
subscribeNsq(configFilePath, "trade", "sh", saveToDfs=true)
subscribeNsq(configFilePath, "snapshot", "sh", saveToDfs=true)

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
existsSubscriptionTopic(,"nsqStockOrdersSHStream","easyNSQ_saveToDfsTable")
existsSubscriptionTopic(,"nsqStockTradeSHStream","easyNSQ_saveToDfsTable")
existsSubscriptionTopic(,"nsqStockSnapshotSHStream","easyNSQ_saveToDfsTable")
select count(*) from objByName("nsqStockOrdersSHStream")
select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH")
select count(*) from objByName("nsqStockTradeSHStream")
select count(*) from loadTable("dfs://nsqStockTrade", "tradeSH")
select count(*) from objByName("nsqStockSnapshotSHStream")
select count(*) from loadTable("dfs://nsqStockSnapshot", "snapshotSH")

// 浠呭仠姝rders琛屾儏鏁版嵁鐨勮闃�
nsq::unsubscribe("orders", "sh")
nsq::getSubscriptionStatus()

// 鍋滄鎵€鏈夎闃�
closeNsqConnection()
nsq::getSubscriptionStatus()

/** 渚�3.1 鍋滄渚�2涓殑璁㈤槄鍚庯紝閲嶆柊鎺ユ敹涓婃捣甯傚満orders鏁版嵁锛屼笉娓呴櫎涔嬪墠璁㈤槄鎸佷箙鍖栧埌鍒嗗尯琛ㄧ殑琛屾儏鏁版嵁 */
// 鍒濆鍖栨祦鐜骞舵媺璧疯闃�
iniNsqEnv("nsqStockOrdersSHStream")
streamTableNames, dbPath, tableNames = subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true)

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
existsSubscriptionTopic(,streamTableNames[0],"easyNSQ_saveToDfsTable")
select count(*) from objByName(streamTableNames[0])
select count(*) from loadTable(dbPath, tableNames[0])

// 鍋滄璁㈤槄
nsq::unsubscribe("orders", "sh")

/** 渚�3.1 鍋滄渚�2涓殑璁㈤槄鍚庯紝閲嶆柊鎺ユ敹涓婃捣甯傚満orders鏁版嵁锛屼絾鏄竻闄や箣鍓嶈闃呮寔涔呭寲鍒板垎鍖鸿〃鐨勮鎯呮暟鎹� */

// 鍒濆鍖栫幆澧冨苟鎷夎捣璁㈤槄
iniNsqEnv("nsqStockOrdersSHStream")
iniNsqDfs("dfs://nsqStockOrders", "ordersSH")
subscribeNsq(configFilePath, "orders", "sh", saveToDfs=true)

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
existsSubscriptionTopic(,"nsqStockOrdersSHStream","easyNSQ_saveToDfsTable")
select count(*) from objByName("nsqStockOrdersSHStream")
select count(*) from loadTable("dfs://nsqStockOrders", "ordersSH")

// 鍋滄璁㈤槄
nsq::unsubscribe("orders", "sh")

/** 渚�4.1 鎺ユ敹NSQ涓婁氦鎵€鍜屾繁浜ゆ墍鐨勫疄鏃惰鎯呮暟鎹紝骞舵寔涔呭寲鍒板垎鍖鸿〃锛屽悎骞舵勃娣辨暟鎹� */

// 鍒濆鍖栫幆澧冨苟鎷夎捣璁㈤槄
iniNsqEnv()
iniNsqDfs()
subscribeNsq(configFilePath, "orders", ["sh","sz"], merge=true, saveToDfs=true)
subscribeNsq(configFilePath, "trade", ["sh","sz"], merge=true, saveToDfs=true)
subscribeNsq(configFilePath, "snapshot", ["sh","sz"], merge=true, saveToDfs=true)

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
existsSubscriptionTopic(,"nsqStockOrdersStream","easyNSQ_saveToDfsTable")
existsSubscriptionTopic(,"nsqStockTradeStream","easyNSQ_saveToDfsTable")
existsSubscriptionTopic(,"nsqStockSnapshotStream","easyNSQ_saveToDfsTable")

// 鍋滄璁㈤槄
closeNsqConnection()


/** 渚�4.2 鎺ユ敹NSQ涓婁氦鎵€鍜屾繁浜ゆ墍鐨勫疄鏃惰鎯呮暟鎹紝骞舵寔涔呭寲鍒板垎鍖鸿〃锛屾勃娣辨暟鎹垎寮€锛堢敤鎴锋寚瀹氭祦琛ㄥ拰鍒嗗尯琛ㄧ殑鍚嶅瓧锛� */

// 鍒濆鍖栫幆澧冨苟鎷夎捣璁㈤槄
iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"])
iniNsqDfs("dfs://myNsqOrders", `myNsqOrdersSH`myNsqOrdersSZ)
iniNsqDfs("dfs://myNsqTrade", `myNsqTradeSH`myNsqTradeSZ)
iniNsqDfs("dfs://myNsqSnapshot", `myNsqSnapshotSH`myNsqSnapshotSZ)
subscribeNsq(configFilePath, "orders", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqOrdersSHStream", "myNsqOrdersSZStream"], dbPath="dfs://myNsqOrders", tableNames=["myNsqOrdersSH", "myNsqOrdersSZ"])
subscribeNsq(configFilePath, "trade", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqTradeSHStream", "myNsqTradeSZStream"], dbPath="dfs://myNsqTrade", tableNames=["myNsqTradeSH", "myNsqTradeSZ"])
subscribeNsq(configFilePath, "snapshot", ["sh","sz"], saveToDfs=true, streamTableNames=["myNsqSnapshotSHStream", "myNsqSnapshotSZStream"], dbPath="dfs://myNsqSnapshot", tableNames=["myNsqSnapshotSH", "myNsqSnapshotSZ"])

// 妫€鏌ヨ闃呮儏鍐�
nsq::getSubscriptionStatus()
select * from getStreamingStat().subWorkers where topic like "%easyNSQ_saveToDfsTable%"

// 鍋滄璁㈤槄
closeNsqConnection()


/** 娓呯悊涔嬪墠渚嬪瓙杩愯鏃堕仐鐣欑殑娴佽〃鍜屽垎鍖鸿〃 */
iniNsqEnv()
iniNsqDfs()
iniNsqEnv(["myNsqOrdersSHStream", "myNsqOrdersSZStream", "myNsqTradeSHStream", "myNsqTradeSZStream","myNsqSnapshotSHStream", "myNsqSnapshotSZStream"])
iniNsqDfs("dfs://myNsqOrders", `myNsqOrdersSH`myNsqOrdersSZ)
iniNsqDfs("dfs://myNsqTrade", `myNsqTradeSH`myNsqTradeSZ)
iniNsqDfs("dfs://myNsqSnapshot", `myNsqSnapshotSH`myNsqSnapshotSZ)

/** 渚�5 鎺ユ敹涓婃捣鍜屾繁璇佷袱涓競鍦虹殑 NSQ 琛屾儏鏁版嵁銆佹寔涔呭寲瀛樺偍锛屽苟瀵硅鎯呮暟鎹垎鍒仛娌繁鍚堝苟鍜屾勃娣卞垎寮€涓ょ澶勭悊鐨剆tartup.dos */
login(`admin, `123456)
go

pluginPath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/PluginNsq.txt"
configFilePath = "/home/appadmin/mqzhu/v2.00.9.6/server/plugins/nsq/nsq_sdk_config.ini";
// 纭繚 nsq 鎻掍欢鍔犺浇鎴愬姛
try{ loadPlugin(pluginPath) } catch(ex) { print(ex) }
go

// 璋冪敤妯″潡
use DolphinDBModules::easyNSQ
go

// 鍒濆鍖栫幆澧�(涓嶅垹闄ゅ垎鍖鸿〃)
iniNsqEnv()
go

// 鎷夎捣璁㈤槄
subscribeNsq(configFilePath, "orders", ["sh","sz"], saveToDfs=true)
subscribeNsq(configFilePath, "trade", ["sh","sz"], saveToDfs=true)
subscribeNsq(configFilePath, "snapshot", ["sh","sz"], saveToDfs=true)
subscribeNsq(configFilePath, "orders", ["sh","sz"], merge=true, saveToDfs=true)
subscribeNsq(configFilePath, "trade", ["sh","sz"], merge=true, saveToDfs=true)
subscribeNsq(configFilePath, "snapshot", ["sh","sz"], merge=true, saveToDfs=true)