// filter def snapshotFilter(engineName, mutable data){ t = select * from data where left(SecurityID, 2)="60" and time(DateTime)>=09:25:00.000 getStreamEngine(engineName).append!(t) } subscribeTable(tableName="snapshotStreamTable", actionName="snapshotFilter", offset=-1, handler=snapshotFilter{"calChange"}, msgAsTable=true, hash=0) // calculate @state def calculateChange(DateTime, LastPx, lag){ windowFirstPx = tmfirst(DateTime, LastPx, lag) preMinutePx = tmove(DateTime, LastPx, lag) prevLastPx = iif(preMinutePx == NULL, windowFirstPx, preMinutePx) return 100 * (LastPx - prevLastPx) \ prevLastPx } // rank engine schemaTB = table(1:0, `SecurityID`DateTime`factor_1min`factor_5min`factor_10min, [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE]) createCrossSectionalEngine(name="crossSectionalEngine", metrics=<[SecurityID, factor_1min, rank(factor_1min, ascending=false), factor_5min, rank(factor_5min, ascending=false), factor_10min, rank(factor_10min, ascending=false)]>, dummyTable=schemaTB, outputTable=objByName("changeCrossSectionalTable"), keyColumn=`SecurityID, triggeringPattern='perBatch', useSystemTime=false, timeColumn=`DateTime) // calculate engine createReactiveStateEngine(name="calChange", metrics=<[DateTime, calculateChange(DateTime, LastPx, lag=1m), calculateChange(DateTime, LastPx, lag=5m), calculateChange(DateTime, LastPx, lag=10m)]>, dummyTable=objByName("snapshotStreamTable"), outputTable=getStreamEngine("crossSectionalEngine"), keyColumn=`SecurityID, filter== 09:30:00.000>)