use ops // clean up environment def cleanEnvironment(){ cancelJobEx() try{ unsubscribeTable(tableName=`snapshotStreamTable, actionName="snapshotFilter") } catch(ex){ print(ex) } try{ dropStreamEngine("calChange")} catch(ex){ print(ex) } try{ dropStreamEngine("crossSectionalEngine") } catch(ex){ print(ex) } try{ undef("snapshotStreamTable", SHARED) } catch(ex){ print(ex) } try{ undef("changeCrossSectionalTable", SHARED) } catch(ex){ print(ex) } } // create stream table def createStreamTable(dbName, tbName){ schemaTB = loadTable(dbName, tbName).schema().colDefs share(streamTable(40000:0, schemaTB.name, schemaTB.typeString), `snapshotStreamTable) share(keyedTable(`SecurityID, 50:0, `DateTime`SecurityID`factor_1min`rank_1min`factor_5min`rank_5min`factor_10min`rank_10min, [TIMESTAMP, SYMBOL, DOUBLE, INT, DOUBLE, INT, DOUBLE, INT]), `changeCrossSectionalTable) } cleanEnvironment() dbName, tbName = "dfs://SH_TSDB_snapshot_ArrayVector", "snapshot" createStreamTable(dbName, tbName)