N to 1 Multi-Table Replay
The N to 1 type replays multiple tables to one table. It supports the following replay mode:
- Homogeneous replay: Prior to version 2.00.5, replay only supports inputting tables with the same schema.
- Heterogeneous replay: Starting from version 2.00.5, heterogeneous replay is supported to replay multiple data sources with different schemata to the same output table.
Example 1
The following example demonstrates how to replay multiple tables to one table at different rates.
- Create input and output tables for replaying and writing simulated data to the
input
tables:
// Create the input table t1 and insert simulated data n = 1000 sym = take(`IBM`GS,n) myDate = take(2021.01.02..2021.01.06, n).sort!() myTime = take(13:00:00..16:59:59,n) vol = array(INT[], 0, 10) for(i in 0:n){vol.append!([rand(100,3)])} t1 = table(sym,myDate,myTime,vol).sortBy!([`myDate, `myTime]) // Create the input table t2 and insert simulated data sym = take(`IBM`GS,n) date = take(2021.01.02..2021.01.06, n).sort!() time = take(13:00:00..16:59:59,n) vol = array(INT[], 0, 10) for(i in 0:n){vol.append!([rand(100,3)])} price = array(DOUBLE[], 0, 10) for(i in 0:n){price.append!([rand(10.0,3)])} t2 = table(sym, date,time,vol,price).sortBy!([`date, `time]) // Create an output table opt share streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT[]]) as opt // Create a dictionary with the unique identifiers of the input tables as keys and the table objects as values input_dict = dict(["msg1", "msg2"], [t1, t2])
The schema and data of table t1 are as follows:
t1.schema().colDefs
name typeString typeInt sym STRING 18 myDate DATE 6 myTime SECOND 10 vol INT[] 68 select * from t1 limit 5
sym myDate myTime vol IBM 2021.01.02 13:00:00 [89,26,10] GS 2021.01.02 13:00:01 [52,30,59] IBM 2021.01.02 13:00:02 [45,11,87] GS 2021.01.02 13:00:03 [92,0,36] IBM 2021.01.02 13:00:04 [85,98,47] - Replay 1,000 records per second. For 2,000 records in both input tables, it
takes about 2
seconds.
timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict, replayRate=1000, absoluteRate=true) // Time elapsed: 2010.107 ms
- Replay at 100,000 times the time span of the data. The difference between the
start timestamp and the end timestamp in both input tables is 346,600 seconds,
and it takes about 3.5 seconds to replay the
table.
timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict, replayRate=100000, absoluteRate=false) // Time elapsed: 3485.393 ms
- Replay at the maximum
speed:
The schema and data of the output table opt are as follows:timer replay(inputTables=trades,outputTables=st,dateColumn=`date,timeColumn=`time) // Time elapsed: 1.996 ms
opt.schema().colDefs
name typeString typeInt timestamp DATETIME 11 sym SYMBOL 17 blob BLOB 32 vol INT[] 68 select * from opt limit 5
timestamp sym blob vol 2021.01.02T13:00:00 msg2 IBM�Hж (X %cx�?�Q�� @��_w�? [19,40,88] 2021.01.02T13:00:00 msg1 IBM�Hж Y [89,26,10] 2021.01.02T13:00:01 msg2 GS�HѶ M 8 e�Q@pƈ6@x �\ @ [77,4,56] 2021.01.02T13:00:01 msg1 GS�HѶ 4; [52,30,59] 2021.01.02T13:00:02 msg2 IBM�HҶ : ��V~@���@qi#@ [58,22,32]
Example 2
The following example uses the replayDS
function to replay data from
a DFS table.
- Write the input tables to a
database:
if(existsDatabase("dfs://test_stock1")){ dropDatabase("dfs://test_stock1") } db1=database("",RANGE, 2021.01.02..2021.01.07) db2=database("",VALUE,`IBM`GS) db=database("dfs://test_stock1",COMPO,[db1, db2], engine="TSDB") orders=db.createPartitionedTable(t1,`orders,`myDate`sym, sortColumns=`sym`myDate`myTime) orders.append!(t1) trades=db.createPartitionedTable(t2,`trades,`date`sym, sortColumns=`sym`date`time) trades.append!(t2)
- Use the
replayDS
function to split the data source:// View the number of data sources split by ds1 and ds2 ds1 = replayDS(sqlObj=<select * from loadTable(db, `orders)>, dateColumn=`myDate, timeColumn=`myTime) ds1.size() ds2 = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`date, timeColumn=`time) ds2.size() input_dict = dict(["msg1", "msg2"], [ds1, ds2]) // Create a dictionary with the unique identifiers of the input tables as keys and data sources as values date_dict = dict(["msg1", "msg2"], [`myDate, `date]) // Create a dictionary with the unique identifiers of the input tables as keys and date columns as values time_dict = dict(["msg1", "msg2"], [`myTime, `time]) // Create a dictionary with the unique identifiers of the input tables as keys and time columns as values
- Use the
replay
function to replay the split data sources at the maximum speed:timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict) // Time elapsed: 9.972 ms
Example 3
The following example demonstrates the process of heterogeneous replay.
- Create the stream filter
engine:
// Create an input table for the engine share streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT[]]) as streamFilter_input // Create an output table for the engine filterOrder=table(100:0, `sym`date`time`volume, [SYMBOL, DATE, SECOND, INT[]]) filterTrades=table(100:0, `sym`date`time`volume`price, [SYMBOL, DATE, SECOND, INT[], DOUBLE[]]) // Set the filtering conditions for the engine filter1=dict(STRING,ANY) filter1['condition']=`msg1 filter1['handler']=filterOrder filter2=dict(STRING,ANY) filter2['condition']=`msg2 filter2['handler']=filterTrades schema=dict(["msg1","msg2"], [filterOrder, filterTrades]) // Create the streamFilter engine to process the received data and send the results to tables filterOrder and filterTrades stEngine=streamFilter(name=`streamFilter, dummyTable=streamFilter_input, filter=[filter1,filter2], msgSchema=schema)
- Subscribe to table opt and use the streamFilter engine to process the
subscribed
data.
subscribeTable(tableName="opt", actionName="sub1", offset=0, handler=stEngine, msgAsTable=true)
- Use the
replayDS
function to replay the split data sources at the maximum speed:
The schemata and data of output tables filterOrder and filterTrades are as follows:timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict) // Time elapsed: 9.012 ms
filterOrder.schema().colDefs
name typeString typeInt sym SYMBOL 17 date DATE 6 time SECOND 10 volume INT[] 68 select * from filterOrder limit 5
sym myDate myTime vol IBM 2021.01.02 13:00:00 [89,26,10] GS 2021.01.02 13:00:01 [52,30,59] IBM 2021.01.02 13:00:02 [45,11,87] GS 2021.01.02 13:00:03 [92,0,36] IBM 2021.01.02 13:00:04 [85,98,47]