def dsTb(timeRS, startDate, endDate, stkList, replayName)
{
    if(replayName == "snapshot"){
        tab = loadTable("dfs://Test_snapshot", "snapshot")
	}
	else if(replayName == "order") {
		tab = loadTable("dfs://Test_order", "order")
	}
	else if(replayName == "transaction") {
		tab = loadTable("dfs://Test_transaction", "transaction")
	}
	else {
		return NULL
	}
    ds = replayDS(sqlObj=<select * from tab where MDDate>=startDate and MDDate<endDate and HTSCSecurityID in stkList>, dateColumn='MDDate', timeColumn='MDTime', timeRepartitionSchema=timeRS)
    return ds
}

def createEnd(tabName, sortColumn)
{
    dbName = "dfs://End"
    tbName = "endline"
    if(not existsDatabase(dbName))
    {
        db = database(directory=dbName, partitionType=VALUE, partitionScheme=2023.04.03..2023.04.04)
        endTb = table(2200.01.01T23:59:59.000 as DateTime, `END as point, long(0) as ApplSeqNum)
        endLine = db.createPartitionedTable(table=endTb, tableName=tbName, partitionColumns=`DateTime)
        endLine.append!(endTb)
    }
     
    ds = replayDS(sqlObj=<select * from loadTable(dbName, tbName)>, dateColumn='DateTime', timeColumn='DateTime')
    
    inputEnd = dict(["end"], [ds])
    dateEnd = dict(["end"], [`DateTime])
    timeEnd = dict(["end"], [`DateTime])
    if(sortColumn == "NULL")
    {
        replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1)
    }
    else
    {
        replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1, sortColumns=sortColumn)
    }
}

def replayJob(inputDict, tabName, dateDict, timeDict, replayRate, sortColumn)
{
    if(sortColumn == "NULL")
    {
        replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23)
    }
    else
    {
        replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23, sortColumns=sortColumn)    
    }
    createEnd(tabName, sortColumn)
}

def stkReplay(stkList, mutable startDate, mutable endDate, replayRate, replayUuid, replayName)
{
    maxCnt = 50
    returnBody = dict(STRING, STRING)
    startDate = datetimeParse(startDate, "yyyyMMdd")
    endDate = datetimeParse(endDate, "yyyyMMdd") + 1
    sortColumn = "ApplSeqNum"
    if(stkList.size() > maxCnt)
    {
        returnBody["errorCode"] = "0"
        returnBody["errorMsg"] = "Exceeds the limits for a single replay. The maximum is: " + string(maxCnt)
        return returnBody
    }
    if(size(replayName) != 0)
    { 
        for(name in replayName)
        {
            if(not name in ["snapshot", "order", "transaction"])
            {
                returnBody["errorCode"] = "0"
                returnBody["errorMsg"] = "Input the correct name of the data source. Cannot recognize: " + name
                return returnBody
            }
        }
    }
    else
    {
        returnBody["errorCode"] = "0"
        returnBody["errorMsg"] = "Missing data source. Input the correct name of the data source."
        return returnBody
    }
    try 
    {
        if(size(replayName) == 1 && replayName[0] == "snapshot")
        {
            colName = ["timestamp", "biz_type", "biz_data"]
            colType = [TIMESTAMP, SYMBOL, BLOB]
            sortColumn = "NULL"
        }
        else
        {
            colName = ["timestamp", "biz_type", "biz_data", sortColumn]
            colType = [TIMESTAMP, SYMBOL, BLOB, LONG]
        }
        msgTmp = streamTable(10000000:0, colName, colType)
        tabName = "replay_" + replayUuid
        enableTableShareAndPersistence(table=msgTmp, tableName=tabName, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=60, flushMode=0, preCache=1000000)
        
        timeRS = cutPoints(09:30:00.000..15:00:00.000, 23)
        
        inputDict = dict(replayName, each(dsTb{timeRS, startDate, endDate, stkList}, replayName))
        dateDict = dict(replayName, take(`MDDate, replayName.size()))
        timeDict = dict(replayName, take(`MDTime, replayName.size()))
        
        jobId = "replay_" + replayUuid
        jobDesc = "replay stock data"
        submitJob(jobId, jobDesc, replayJob{inputDict, tabName, dateDict, timeDict, replayRate, sortColumn})
        returnBody["errorCode"] = "1"
        returnBody["errorMsg"] = "Replay successfully."
        return returnBody
    }
    catch(ex)
    {
        returnBody["errorCode"] = "0"
        returnBody["errorMsg"] = "Exception occurred when replaying: " + ex
        return returnBody
    }
}

addFunctionView(dsTb)
addFunctionView(createEnd)
addFunctionView(replayJob)
addFunctionView(stkReplay)