login("admin", "123456") //输入流数据表 st = streamTable(1000000:0, `deviceID`ts`temperature, [INT,DATETIME,FLOAT]) enableTableShareAndPersistence(st, `sensor, false, true, 1000000) //创建报警表 share streamTable(1000:0, `time`deviceID`anomalyType`anomalyString, [DATETIME,INT,INT, SYMBOL]) as warningTable //创建异常检测引擎,2分钟内,温度传感器出现2次40度以上,一次30度以上,报警 engine = createAnomalyDetectionEngine(name="engine1", metrics=<[sum(temperature > 40) > 2 && sum(temperature > 30) > 3 ]>, dummyTable=sensor, outputTable=warningTable, timeColumn=`ts,keyColumn=`deviceID ,windowSize= 120,step=30) subscribeTable(tableName="sensor", actionName="sensorAnomalyDetection", offset=0, handler= append!{engine}, msgAsTable=true) //5分钟无数据,报警 t=keyedTable(`deviceID,100:0, `deviceID`time, [INT,DATETIME]) deviceNum = 3 insert into t values(1..deviceNum, take(now().datetime(), deviceNum)) def checkNoData (mutable keyedTable, mutable outputTable, msg) { keyedTable.append!(select deviceID, ts from msg) warning = select now().datetime(), deviceID, 1 as anomalyType, "" as anomalyString from keyedTable where time < datetimeAdd(now().datetime(), -5, "m") outputTable.append!(warning) } subscribeTable(tableName="sensor", actionName="noData", offset=0, handler=checkNoData{t, warningTable}, msgAsTable=true, batchSize=1000000, throttle=1) //模拟写入流表 def writeData(){ deviceNum = 3 for (i in 0:60) { data = table(take(1..deviceNum,deviceNum) as deviceID ,take(now().datetime(),deviceNum) as ts,rand(10..41,deviceNum) as temperature) sensor.append!(data) sleep(1000) } deviceNum = 2 for (i in 0:600) { data = table(take(1..deviceNum,deviceNum) as deviceID ,take(now().datetime(),deviceNum) as ts,rand(10..45,deviceNum) as temperature) sensor.append!(data) sleep(1000) } } submitJob("simulateData", "simulate sensor data", writeData) //取消流表和订阅 dropAggregator("engine1") unsubscribeTable(, "sensor", "sensorAnomalyDetection") unsubscribeTable(, "sensor", "noData") dropStreamTable(`sensor) undef(`st, VAR)