def dsTb(timeRS, startDate, endDate, stkList, replayName) { if(replayName == "snapshot"){ tab = loadTable("dfs://Test_snapshot", "snapshot") } else if(replayName == "order") { tab = loadTable("dfs://Test_order", "order") } else if(replayName == "transaction") { tab = loadTable("dfs://Test_transaction", "transaction") } else { return NULL } ds = replayDS(sqlObj=, dateColumn='DateTime', timeColumn='DateTime') inputEnd = dict(["end"], [ds]) dateEnd = dict(["end"], [`DateTime]) timeEnd = dict(["end"], [`DateTime]) if(sortColumn == "NULL") { replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1) } else { replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1, sortColumns=sortColumn) } } def replayJob(inputDict, tabName, dateDict, timeDict, replayRate, sortColumn) { if(sortColumn == "NULL") { replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23) } else { replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23, sortColumns=sortColumn) } createEnd(tabName, sortColumn) } def stkReplay(stkList, mutable startDate, mutable endDate, replayRate, replayUuid, replayName) { maxCnt = 50 returnBody = dict(STRING, STRING) startDate = datetimeParse(startDate, "yyyyMMdd") endDate = datetimeParse(endDate, "yyyyMMdd") + 1 sortColumn = "ApplSeqNum" if(stkList.size() > maxCnt) { returnBody["errorCode"] = "0" returnBody["errorMsg"] = "超过单次回放股票上限,最大回放上限:" + string(maxCnt) return returnBody } if(size(replayName) != 0) { for(name in replayName) { if(not name in ["snapshot", "order", "transaction"]) { returnBody["errorCode"] = "0" returnBody["errorMsg"] = "请输入正确的数据源名称,不能识别的数据源名称:" + name return returnBody } } } else { returnBody["errorCode"] = "0" returnBody["errorMsg"] = "缺少回放数据源,请输入正确的数据源名称" return returnBody } try { if(size(replayName) == 1 && replayName[0] == "snapshot") { colName = ["timestamp", "biz_type", "biz_data"] colType = [TIMESTAMP, SYMBOL, BLOB] sortColumn = "NULL" } else { colName = ["timestamp", "biz_type", "biz_data", sortColumn] colType = [TIMESTAMP, SYMBOL, BLOB, LONG] } msgTmp = streamTable(10000000:0, colName, colType) tabName = "replay_" + replayUuid enableTableShareAndPersistence(table=msgTmp, tableName=tabName, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=60, flushMode=0, preCache=1000000) timeRS = cutPoints(09:30:00.000..15:00:00.000, 23) inputDict = dict(replayName, each(dsTb{timeRS, startDate, endDate, stkList}, replayName)) dateDict = dict(replayName, take(`MDDate, replayName.size())) timeDict = dict(replayName, take(`MDTime, replayName.size())) jobId = "replay_" + replayUuid jobDesc = "replay stock data" submitJob(jobId, jobDesc, replayJob{inputDict, tabName, dateDict, timeDict, replayRate, sortColumn}) returnBody["errorCode"] = "1" returnBody["errorMsg"] = "后台回放成功" return returnBody } catch(ex) { returnBody["errorCode"] = "0" returnBody["errorMsg"] = "回放行情数据异常,异常信息:" + ex return returnBody } } addFunctionView(dsTb) addFunctionView(createEnd) addFunctionView(replayJob) addFunctionView(stkReplay)