login(`admin,`123456) try{loadPlugin(getHomeDir()+"/plugins/mqtt/PluginMQTTClient.txt")} catch(ex) {print(ex)} go use mqtt; def clearEnv(){ try{ unsubscribeTable( tableName=`inputSt, actionName="monitor") if(objs(true).name.find('inputSt')!=-1) dropStreamTable(`inputSt) if(objs(true).name.find('outputSt1')!=-1) dropStreamTable(`outputSt1) if(objs(true).name.find('outputSt2')!=-1) dropStreamTable(`outputSt2) if (getAggregatorStat().ReactiveStreamEngine[`name].find(`reactivEngine)!=-1) dropAggregator(`reactivEngine) if (getAggregatorStat().SessionWindowEngine[`name].find(`swEngine)!=-1) dropAggregator(`swEngine) for (id in mqtt::getSubscriberStat()[`subscriptionId]) mqtt::unsubscribe(id) }catch(ex){ print(ex) } } def createInOutTable(){ stream01=streamTable(100000:0,`tag`ts`value,[SYMBOL,TIMESTAMP, INT]) enableTableShareAndPersistence(table=stream01,tableName=`inputSt,asynWrite=false,compress=true, cacheSize=100000) out1 =streamTable(10000:0,`tag`ts`value,[SYMBOL,TIMESTAMP, INT]) enableTableShareAndPersistence(table=out1,tableName=`outputSt1,asynWrite=false,compress=true, cacheSize=100000) out2 =streamTable(10000:0,`ts`tag`lastValue,[TIMESTAMP,SYMBOL, INT]) enableTableShareAndPersistence(table=out2,tableName=`outputSt2,asynWrite=false,compress=true, cacheSize=100000) } def process(mutable engine1,mutable engine2,msg){ engine1.append!(msg) engine2.append!(msg) } def consume(){ reactivEngine = createReactiveStateEngine(name=`reactivEngine, metrics=<[ts, value]>, dummyTable=objByName(`inputSt),outputTable= objByName(`outputSt1),keyColumn= "tag",filter=) swEngine = createSessionWindowEngine(name = "swEngine", sessionGap = 30000, metrics = < last(value)>, dummyTable = objByName(`inputSt), outputTable = objByName(`outputSt2), timeColumn = `ts, keyColumn=`tag,useSessionStartTime=false) subscribeTable(tableName="inputSt", actionName="monitor", offset=0, handler=process{swEngine,reactivEngine}, msgAsTable=true) } def writeStreamTable(host, port, topic){ //sp = mqtt::createCsvParser([SYMBOL,TIMESTAMP,INT]) sp = createJsonParser([SYMBOL,TIMESTAMP, INT], `tag`ts`value) mqtt::subscribe(host, port, topic, sp, objByName(`inputSt)) } def publishTableData(server,topic,f, batchsize,t){ conn=connect(server,1883,0,f,batchsize) publish(conn,topic,t) close(conn) } def main(){ clearEnv() createInOutTable() consume() host="127.0.0.1" port=1883 topic="sensor/test" writeStreamTable(host, port, topic) t=loadText(getHomeDir()+"/deviceState.csv") //myFormat=take("", 3) f= mqtt::createCsvFormatter(myFormat, ',', ';') f = createJsonFormatter() batchsize=100 submitJob("submit_pub1", "submit_p1", publishTableData{host,topic,f, batchsize,t}) } main() getAggregatorStat().ReactiveStreamEngine getAggregatorStat().SessionWindowEngine getStreamingStat().pubTables getStreamingStat().pubConns getStreamingStat().subWorkers mqtt::getSubscriberStat() getRecentJobs() select * from outputSt1 where tag=`motor.C17156B.m1 select * from outputSt2 where tag=`motor.C17156B.m1