N to 1 Multi-Table Replay

The N to 1 type replays multiple tables to one table. It supports the following replay mode:

  • Homogeneous replay: Prior to version 2.00.5, replay only supports inputting tables with the same schema.
  • Heterogeneous replay: Starting from version 2.00.5, heterogeneous replay is supported to replay multiple data sources with different schemata to the same output table.

Example 1

The following example demonstrates how to replay multiple tables to one table at different rates.

  • Create input and output tables for replaying and writing simulated data to the input tables:
    // Create the input table t1 and insert simulated data
    n = 1000
    sym = take(`IBM`GS,n)
    myDate = take(2021.01.02..2021.01.06, n).sort!()
    myTime = take(13:00:00..16:59:59,n)
    vol = array(INT[], 0, 10)
    for(i in 0:n){vol.append!([rand(100,3)])}
    t1 = table(sym,myDate,myTime,vol).sortBy!([`myDate, `myTime])
    // Create the input table t2 and insert simulated data
    sym = take(`IBM`GS,n)
    date = take(2021.01.02..2021.01.06, n).sort!()
    time = take(13:00:00..16:59:59,n)
    vol = array(INT[], 0, 10)
    for(i in 0:n){vol.append!([rand(100,3)])}
    price = array(DOUBLE[], 0, 10)
    for(i in 0:n){price.append!([rand(10.0,3)])}
    t2 = table(sym, date,time,vol,price).sortBy!([`date, `time])
    // Create an output table opt
    share streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT[]]) as opt
    // Create a dictionary with the unique identifiers of the input tables as keys and the table objects as values
    input_dict  = dict(["msg1", "msg2"], [t1, t2])

    The schema and data of table t1 are as follows:

    t1.schema().colDefs
    name typeString typeInt
    sym STRING 18
    myDate DATE 6
    myTime SECOND 10
    vol INT[] 68
    select * from t1 limit 5
    sym myDate myTime vol
    IBM 2021.01.02 13:00:00 [89,26,10]
    GS 2021.01.02 13:00:01 [52,30,59]
    IBM 2021.01.02 13:00:02 [45,11,87]
    GS 2021.01.02 13:00:03 [92,0,36]
    IBM 2021.01.02 13:00:04 [85,98,47]
  • Replay 1,000 records per second. For 2,000 records in both input tables, it takes about 2 seconds.
    timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict,  replayRate=1000, absoluteRate=true)
    // Time elapsed: 2010.107 ms
  • Replay at 100,000 times the time span of the data. The difference between the start timestamp and the end timestamp in both input tables is 346,600 seconds, and it takes about 3.5 seconds to replay the table.
    timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict,  replayRate=100000, absoluteRate=false)
    // Time elapsed: 3485.393 ms
  • Replay at the maximum speed:
    timer replay(inputTables=trades,outputTables=st,dateColumn=`date,timeColumn=`time)
    // Time elapsed: 1.996 ms
    The schema and data of the output table opt are as follows:
    opt.schema().colDefs
    name typeString typeInt
    timestamp DATETIME 11
    sym SYMBOL 17
    blob BLOB 32
    vol INT[] 68
    select * from opt limit 5
    timestamp sym blob vol
    2021.01.02T13:00:00 msg2 IBM�Hж (X %cx�?�Q�� @��_w�? [19,40,88]
    2021.01.02T13:00:00 msg1 IBM�Hж Y [89,26,10]
    2021.01.02T13:00:01 msg2 GS�HѶ M 8 e�Q@pƈ6@x �\ @ [77,4,56]
    2021.01.02T13:00:01 msg1 GS�HѶ 4; [52,30,59]
    2021.01.02T13:00:02 msg2 IBM�HҶ : ��V~@���@qi#@ [58,22,32]

Example 2

The following example uses the replayDS function to replay data from a DFS table.

  • Write the input tables to a database:
    if(existsDatabase("dfs://test_stock1")){
    dropDatabase("dfs://test_stock1")
    }
    db1=database("",RANGE, 2021.01.02..2021.01.07)
    db2=database("",VALUE,`IBM`GS)
    db=database("dfs://test_stock1",COMPO,[db1, db2], engine="TSDB")
    orders=db.createPartitionedTable(t1,`orders,`myDate`sym, sortColumns=`sym`myDate`myTime)
    orders.append!(t1)
    trades=db.createPartitionedTable(t2,`trades,`date`sym, sortColumns=`sym`date`time)
    trades.append!(t2)
  • Use the replayDS function to split the data source:
    // View the number of data sources split by ds1 and ds2
    ds1 = replayDS(sqlObj=<select * from loadTable(db, `orders)>, dateColumn=`myDate, timeColumn=`myTime)
    ds1.size()
    ds2 = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`date, timeColumn=`time)
    ds2.size()
    
    input_dict  = dict(["msg1", "msg2"], [ds1, ds2]) // Create a dictionary with the unique identifiers of the input tables as keys and data sources as values
    date_dict = dict(["msg1", "msg2"], [`myDate, `date]) // Create a dictionary with the unique identifiers of the input tables as keys and date columns as values
    time_dict = dict(["msg1", "msg2"], [`myTime, `time]) // Create a dictionary with the unique identifiers of the input tables as keys and time columns as values
  • Use the replay function to replay the split data sources at the maximum speed:
    timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict)
    // Time elapsed: 9.972 ms

Example 3

The following example demonstrates the process of heterogeneous replay.

  • Create the stream filter engine:
    // Create an input table for the engine
    share streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT[]]) as streamFilter_input
    
    // Create an output table for the engine
    filterOrder=table(100:0, `sym`date`time`volume, [SYMBOL, DATE, SECOND, INT[]])
    filterTrades=table(100:0, `sym`date`time`volume`price, [SYMBOL, DATE, SECOND, INT[], DOUBLE[]])
    // Set the filtering conditions for the engine
    filter1=dict(STRING,ANY)
    filter1['condition']=`msg1
    filter1['handler']=filterOrder
    
    filter2=dict(STRING,ANY)
    filter2['condition']=`msg2
    filter2['handler']=filterTrades
    schema=dict(["msg1","msg2"], [filterOrder, filterTrades])
    
    // Create the streamFilter engine to process the received data and send the results to tables filterOrder and filterTrades
    stEngine=streamFilter(name=`streamFilter, dummyTable=streamFilter_input, filter=[filter1,filter2], msgSchema=schema)
  • Subscribe to table opt and use the streamFilter engine to process the subscribed data.
    subscribeTable(tableName="opt", actionName="sub1", offset=0, handler=stEngine, msgAsTable=true)
  • Use the replayDS function to replay the split data sources at the maximum speed:
    timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict)
    // Time elapsed: 9.012 ms
    The schemata and data of output tables filterOrder and filterTrades are as follows:
    filterOrder.schema().colDefs
    name typeString typeInt
    sym SYMBOL 17
    date DATE 6
    time SECOND 10
    volume INT[] 68
    select * from filterOrder limit 5
    sym myDate myTime vol
    IBM 2021.01.02 13:00:00 [89,26,10]
    GS 2021.01.02 13:00:01 [52,30,59]
    IBM 2021.01.02 13:00:02 [45,11,87]
    GS 2021.01.02 13:00:03 [92,0,36]
    IBM 2021.01.02 13:00:04 [85,98,47]