login("admin","123456") st=streamTable(1000000:0,`hardwareId`ts`temp1`temp2`temp3,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE]) enableTableShareAndPersistence(table=st, tableName=`sensorTemp, asynWrite=true, compress=false, cacheSize=1000000) go def writeData(){ hardwareNumber = 1000 for (i in 0:10000) { data = table(take(1..hardwareNumber,hardwareNumber) as hardwareId ,take(now(),hardwareNumber) as ts,rand(20..41,hardwareNumber) as temp1,rand(30..71,hardwareNumber) as temp2,rand(70..151,hardwareNumber) as temp3) objByName("sensorTemp").append!(data) sleep(10) } } share streamTable(1000000:0, `time`hardwareId`tempavg1`tempavg2`tempavg3, [TIMESTAMP,INT,DOUBLE,DOUBLE,DOUBLE]) as sensorTempAvg demoAgg = createTimeSeriesAggregator(name="demoAgg", windowSize=60000, step=2000, metrics=<[avg(temp1),avg(temp2),avg(temp3)]>, dummyTable=sensorTemp, outputTable=sensorTempAvg, timeColumn=`ts, keyColumn=`hardwareId, garbageSize=2000) subscribeTable( tableName="sensorTemp", actionName="demoAgg", offset=-1, handler=append!{demoAgg}, msgAsTable=true) if(exists("dfs://iotDemoDB")){ dropDatabase("dfs://iotDemoDB") } db1 = database("",VALUE, today()..(today()+30)) db2 = database("",RANGE, 0..10*100) db = database("dfs://iotDemoDB",COMPO,[db1,db2]) tableSchema=table(1:0,`hardwareId`ts`temp1`temp2`temp3,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE]) dfsTable = db.createPartitionedTable(tableSchema,"sensorTemp",`ts`hardwareId) subscribeTable(tableName="sensorTemp", actionName="sensorTemp", offset=-1, handler=append!{dfsTable}, msgAsTable=true, batchSize=1000000, throttle=10) jobId = submitJob("simulateData", "simulate sensor data", writeData)