replay
Syntax
replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate],
[absoluteRate=true], [parallelLevel=1], [sortColumns],
[preciseRate=false])
Details
Replay one or more tables or data sources (generated by replayDS) to table(s) in chronological order to simulate real-time ingestion of streaming data. It is commonly used for backtesting of high-frequency trading strategies.
Replay Types
Based on mappings between the input table(s) and output table(s) , there are three replay types: 1-to-1, N-to-1, and N-to-N.
N-to-N replay is not simply N separate 1-to-1 replay tasks. With N 1-to-1 replays, there are N source tables each replayed to N different target tables, executed in parallel. When inputTables is specified as a tuple of data sources, N-to-N replay coordinates the replay of multiple data sources sequentially, which first finishes replaying the first source before starting replay of the second, and so on.
In contrast to N-to-N replay, N-to-1 replay provides ordered replay of multiple data sources into a single target table. With N-to-1 replay, the timestamp ordering across different input sources is preserved when replaying them into the target table.
streamFilter
.Replay Rate
-
If replayRate is a positive integer and absoluteRate = true, replay replayRate records per second.
-
If replayRate is a positive integer, and absoluteRate = false, replay at replayRate times the time span of the data. Note that the number of records replayed per second is the same.
-
If replayRate is negative or unspecified, replay at the maximum speed.
-
If replayRate is a positive integer andpreciseRate = true, replay at replayRate times the difference between two adjacent records. Suppose two adjacent rows have a timestamp difference of t. If the replayRate is specified as 2, then during replay the two rows will be spaced by t/2, i.e., replaying at twice the speed.
Replay Process
-
Loading data. When parameter inputTables is specified as a tuple of data sources, it is first loaded from disk to memory.
Before version 1.30.21/2.00.9, only data souces with the same index are loaded and then replayed in order. Starting from version 1.30.21/2.00.9, it will be automatically sorted by timestamps before it is replayed.
Note:-
To improve the performance, the parallelLevel parameter can be specified to load data in parallel.
-
Data is loaded and replayed asynchronously.
-
-
Replaying data in batches.
Data to be replayed is loaded from the memory. Only data in the same batch is replayed in order. Therefore, please sort the input tables by the specified time column (as determined by the parameters dateColumn and timeColumn) before calling the
replay
function.Replay one or more tables containing n records with a time span of t seconds:
Replay Rate Batch Size Elapsed time (s) Note replays specified records per second replayRate records from one or multiple table(s) sorted by the timestamps n/replayRate If the number of records loaded in one second is less than replayRate, all loaded data will be replayed as a batch. replays at specified times the time span of the data replayRate *n/t records from one or multiple table(s) sorted by the timestamps t/ replayRate If replayRate *n/t < 1, take 1. If the number of records loaded in one second is less than replayRate*n/t, all loaded data will be replayed as a batch. replays at the maximum speed all loaded data determined by the performance of the system For N-to-N replay, each table is replayed one by one in this case. Note:-
When inputTables is specified as data sources, (i.e., loaded from disk), the replay speed is impacted by disk I/O.
-
The elapsed time of heterogeneous replay will be slightly longer than homogeneous replay.
-
-
Writing data: Currently the system only supports writing to the output tables in a single thread.
-
Terminating the replay: Use command cancelJob or cancelConsoleJob from a new web or console.
Arguments
inputTables can be:
-
for 1-to-1 replay, a non-partitioned in-memory table or data source;
-
for N-to-N/N-to-1 homogeneous replay, multiple non-partitioned in-memory tables or a tuple of data sources;
-
for N-to-1 heterogeneous replay, a dictionary. The key of the dictionary can be of any data type indicating the unique identifier of the input table, and the value is the table object or data source.
outputTables can be:
-
for 1-to-1/N-to-1 homogeneous replay, a table object (a non-partitioned in-memory table/stream table) or a string scalar with the same schema of the input table.
-
for N-to-N replay, a string vector or tuple of table objects (non-partitioned in-memory tables/stream tables) with the same length as that of inputTables. The outputTables and inputTables are mapped one-to-one, and each pair has the same schema.
-
for N-to-1 heterogeneous replay, a table object (a non-partitioned in-memory table/stream table) containing at least three columns:
-
The first column is of TIMESTAMP type indicating the timestamp specified by dateColumn/timeColumn;
-
The second column is of SYMBOL or STRING type indicating the key of the dictionary specified in the inputTables;
-
The third column must be of BLOB type that stores the serialized result of each replayed record.
-
In addition, you can output the columns with the same column names and data types in the input tables.
-
dateColumn and timeColumn (optional) is the column name of the time column. At least one of them must be specified.
-
for 1-to-1/N-to-1 homogeneous replay: it is a string scalar, and the time columns in the inputTables and outputTables must use the same name.
-
for N-to-N replay: It is a string scalar if time columns of the input tables have same column names; otherwise, it is a string vector.
-
for N-to-1 replay: It is a string scalar if time columns of the input tables have same column names; otherwise, it is a dictionary. The key of the dictionary is a user-defined string indicating the unique identifier of the input table, and the value is dateColumn/timeColumn.
If dateColumn and timeColum are specified as the same column or only one of them is specified, there is no restriction on the type of the specified time column.
If dateColumn and timeColum are specified as different columns, dateColumn must be DATE and timeColum can only be SECOND, TIME or NANOTIME.
replayRate (optional) is an integer. Together with the parameter absoluteRate, it determines the speed of replaying data.
absoluteRate (optional) is a Boolean value. The default value is true, indicating that the system replays replayRate records per second. If set to false, data is replayed at replayRate times the time span of the data.
parallelLevel (optional) is a positive integer indicating the number of threads to load data sources to memory concurrently. The default value is 1. If inputTables is not a data source, there is no need to specify.
sortColumns (optional) is a STRING scalar or vector of length 2. Data with the same timestamp is sorted according to the specified sortColumns. It is supported only for heterogeneous replay.
Note that any column in either of the input tables can be specified as a sort column. If one of the input tables doesn't contain the specified sort column, it is filled with NULL values and treated as the minimum values when the data is sorted.
preciseRate (optional) is a Boolean value. The default value is false. If it is set to true, the data is replayed at replayRate times the time difference between two adjacent records. Note that deviation of a few milliseconds may exist.
Examples
Example 1. 1-to-1 replay:
n=1000
sym = take(`IBM,n)
timestamp= take(temporalAdd(2012.12.06T09:30:12.000,1..500,'s'),n)
volume = rand(100,n)
trades=table(sym,timestamp,volume)
trades.sortBy!(`timestamp)
share streamTable(100:0,`sym`timestamp`volume,[SYMBOL,TIMESTAMP,INT]) as st
Replay 100 records per second. For 1000 records in table "trades", it takes about 10 seconds.
timer replay(inputTables=trades, outputTables=st, dateColumn=`timestamp, timeColumn=`timestamp,replayRate=100, absoluteRate=true);
// output
Time elapsed: 10001.195 ms
Replay at 100 times the time span of the data. The difference between the start timestamp and the end timestamp in table "trades" is 500 seconds, and it takes about 5 seconds to replay the table.
timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp,replayRate=100,absoluteRate=false);
// output
Time elapsed: 5001.909 ms
Replay at the maximum speed:
timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp);
// output
Time elapsed: 0.974 ms
Replay at 100 times the difference between two adjacent records with the preciseRate specified. It takes about 4.99 seconds to replay the table.
timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp,replayRate=100,absoluteRate=false, preciseRate=true);
Time elapsed: 4991.177 ms
Example 2. N-to-N replay.
The following script replays two data sources to the join engine for asof join.
n=50000
sym = rand(symbol(`IBM`APPL`MSFT`GOOG`GS),n)
date=take(2012.06.12..2012.06.16,n)
time=rand(13:00:00.000..16:59:59.999,n)
volume = rand(100,n)
t1=table(sym,date,time,volume).sortBy!([`date, `time])
sym = rand(symbol(`IBM`APPL`MSFT`GOOG`GS),n)
date=take(2012.06.12..2012.06.16,n)
time=rand(13:00:00.000..16:59:59.999,n)
price = 100 + rand(10.0,n)
t2=table(sym,date,time,price).sortBy!([`date, `time])
if(existsDatabase("dfs://test_stock")){
dropDatabase("dfs://test_stock")
}
db=database("dfs://test_stock",VALUE,2012.06.12..2012.06.16)
pt1=db.createPartitionedTable(t1,`pt1,`date).append!(t1)
pt2=db.createPartitionedTable(t2,`pt2,`date).append!(t2)
left = table(100:0,`sym`dt`volume,[SYMBOL,TIMESTAMP,INT])
right = table(100:0,`sym`dt`price,[SYMBOL,TIMESTAMP,DOUBLE])
opt=table(100:0, `dt`sym`volume`price`total, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE])
ajEngine=createAsofJoinEngine(name="ajEngine", leftTable=left, rightTable=right, outputTable=opt, metrics=<[volume, price, volume*price]>, matchingColumn=`sym, timeColumn=`dt, useSystemTime=false, delayedTime=1)
ds1=replayDS(sqlObj=<select sym, concatDateTime(date, time) as dt, volume from pt1>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:00:00.000, 14:00:00.000, 15:00:00.000, 16:00:00.000, 17:00:00.000])
ds2=replayDS(sqlObj=<select sym, concatDateTime(date, time) as dt, price from pt2>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:00:00.000, 14:00:00.000, 15:00:00.000, 16:00:00.000, 17:00:00.000])
replay(inputTables=[ds1,ds2], outputTables=[getLeftStream(ajEngine), getRightStream(ajEngine)], dateColumn=`dt);
select count(*) from opt
// output
50000
Example 3. N-to-1 heterogeneous replay. The output table needs to be deserialized,
filtered and processed by streamFilter
.
n=1000
sym = take(`IBM`GS,n)
myDate=take(2021.01.02..2021.01.06, n).sort!()
myTime=take(09:30:00..15:59:59,n)
vol = rand(100,n)
t=table(sym,myDate,myTime,vol)
sym = take(`IBM`GS,n)
date=take(2021.01.02..2021.01.06, n).sort!()
time=take(09:30:00..15:59:59,n)
vol = rand(100,n)
price = take(10,n)+rand(1.0,n)
t1=table(sym, date,time,vol,price)
if(existsDatabase("dfs://test_stock1")){
dropDatabase("dfs://test_stock1")
}
db1=database("",RANGE, 2021.01.02..2021.01.07)
db2=database("",VALUE,`IBM`GS)
db=database("dfs://test_stock1",COMPO,[db1, db2])
orders=db.createPartitionedTable(t,`orders,`myDate`sym)
orders.append!(t);
trades=db.createPartitionedTable(t1,`trades,`date`sym)
trades.append!(t1);
// load data sources
ds = replayDS(sqlObj=<select * from loadTable(db, `orders)>, dateColumn=`myDate, timeColumn=`myTime)
ds.size();
ds1 = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`date, timeColumn=`time)
ds1.size();
input_dict = dict(["msg1", "msg2"], [ds, ds1])
date_dict = dict(["msg1", "msg2"], [`myDate, `date])
time_dict = dict(["msg1", "msg2"], [`myTime, `time])
//subscribe to the output table of replay to ingest the data to the stream filter
share streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT]) as opt
filterOrder=table(100:0, `sym`date`time`volume, [SYMBOL, DATE, SECOND, INT])
filterTrades=table(100:0, `sym`date`time`volume`price, [SYMBOL, DATE, SECOND, INT, DOUBLE])
//define the input table of stream filter
share streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT]) as streamFilter_input
// the stream filter splits the ingested data and distributes them to table "filterOrder" and "filterTrades"
filter1=dict(STRING,ANY)
filter1['condition']=`msg1
filter1['handler']=filterOrder
filter2=dict(STRING,ANY)
filter2['condition']=`msg2
filter2['handler']=filterTrades
schema=dict(["msg1","msg2"], [filterOrder, filterTrades])
stEngine=streamFilter(name=`streamFilter, dummyTable=streamFilter_input, filter=[filter1,filter2], msgSchema=schema)
subscribeTable(tableName="opt", actionName="sub1", offset=0, handler=stEngine, msgAsTable=true)
replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict, replayRate=100, absoluteRate=false);
select count(*) from filterOrder
// output
1000