N 对 1 多表回放

N 对 1 多表回放包含同构回放和异构回放两种模式:

  • 2.00.5 版本前,N 对 1 回放要求输入表结构必须相同,称为同构回放

  • 自 2.00.5 版本起,N 对 1 回放开始支持输入结构不同的表,称为异构回放

本文档介绍 N 对 1 多表回放的两种模式使用方法。

多表回放功能主要由 replay 函数提供。其语法如下:

replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [absoluteRate=true], [parallelLevel=1], [sortColumns])

其参数的详细含义可以参考:replay 函数

应用例子 1

按不同回放模式进行回放:

  • 模拟回放使用的输入表和输出表:

    //创建输入表1并写入模拟数据
    n = 1000
    sym = take(`IBM`GS,n)
    myDate = take(2021.01.02..2021.01.06, n).sort!()
    myTime = take(13:00:00..16:59:59,n)
    vol = array(INT[], 0, 10)
    for(i in 0:n){vol.append!([rand(100,3)])}
    t1 = table(sym,myDate,myTime,vol).sortBy!([`myDate, `myTime])
    //创建输入表2并写入模拟数据
    sym = take(`IBM`GS,n)
    date = take(2021.01.02..2021.01.06, n).sort!()
    time = take(13:00:00..16:59:59,n)
    vol = array(INT[], 0, 10)
    for(i in 0:n){vol.append!([rand(100,3)])}
    price = array(DOUBLE[], 0, 10)
    for(i in 0:n){price.append!([rand(10.0,3)])}
    t2 = table(sym, date,time,vol,price).sortBy!([`date, `time])
    //创建输出表
    share streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT[]]) as opt
    //输入表和表对象的映射字典
    input_dict  = dict(["msg1", "msg2"], [t1, t2])

    以下是输入表 t1t2 的结构以及数据预览

    t1.schema().colDefs

    name

    typeString

    typeInt

    sym STRING 18
    myDate DATE 6
    myTime SECOND 10
    vol INT[] 68
    select * from t1 limit 5
    sym myDate myTime vol
    IBM 2021.01.02 13:00:00 [89,26,10]
    GS 2021.01.02 13:00:01 [52,30,59]
    IBM 2021.01.02 13:00:02 [45,11,87]
    GS 2021.01.02 13:00:03 [92,0,36]
    IBM 2021.01.02 13:00:04 [85,98,47]
  • 每秒回放 1000 条数据:

    timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict,  replayRate=1000, absoluteRate=true)
    Time elapsed: 2010.107 ms

    两张输入表中一共有 2000 条数据,每秒回放 1000 条耗时大约 2 秒。

  • 加速 100,000 倍时间回放:

    timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict,  replayRate=100000, absoluteRate=false)
    Time elapsed: 3485.393 ms

    两张输入表中的最大时间与最小时间相差 346,600 秒,加速 100,000 倍时间回放耗时大约 3.5 秒。

  • 以最快的速率回放:

    timer replay(inputTables=trades,outputTables=st,dateColumn=`date,timeColumn=`time)
    Time elapsed: 1.996 ms

    以下是异构回放输出表 opt 的结构以及数据预览

    opt.schema().colDefs

    name

    typeString

    typeInt

    timestamp DATETIME 11
    sym SYMBOL 17
    blob BLOB 32
    vol INT[] 68
    select * from opt limit 5
    timestamp sym blob vol
    2021.01.02T13:00:00 msg2 IBM�Hж (X %cx�?�Q�� @��_w�? [19,40,88]
    2021.01.02T13:00:00 msg1 IBM�Hж Y [89,26,10]
    2021.01.02T13:00:01 msg2 GS�HѶ M 8 e�Q@pƈ6@x �\ @ [77,4,56]
    2021.01.02T13:00:01 msg1 GS�HѶ 4; [52,30,59]
    2021.01.02T13:00:02 msg2 IBM�HҶ : ��V~@���@qi#@ [58,22,32]

应用例子 2

结合 replayDS 回放磁盘分区表数据:

  • 将输入表写入数据库中:

    //将输入表写入数据库中
    if(existsDatabase("dfs://test_stock1")){
    dropDatabase("dfs://test_stock1")
    }
    db1=database("",RANGE, 2021.01.02..2021.01.07)
    db2=database("",VALUE,`IBM`GS)
    db=database("dfs://test_stock1",COMPO,[db1, db2], engine="TSDB")
    orders=db.createPartitionedTable(t1,`orders,`myDate`sym, sortColumns=`sym`myDate`myTime)
    orders.append!(t1)
    trades=db.createPartitionedTable(t2,`trades,`date`sym, sortColumns=`sym`date`time)
    trades.append!(t2)
  • 使用 replayDS 对数据源进行划分:

    //获取数据源
    ds1 = replayDS(sqlObj=<select * from loadTable(db, `orders)>, dateColumn=`myDate, timeColumn=`myTime)
    ds1.size()
    ds2 = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`date, timeColumn=`time)
    ds2.size()
    //输入表的表名及时间列映射字典
    input_dict  = dict(["msg1", "msg2"], [ds1, ds2])
    date_dict = dict(["msg1", "msg2"], [`myDate, `date])
    time_dict = dict(["msg1", "msg2"], [`myTime, `time])
    
    //查看划分出的数据源个数
    ds.size()
    
    // output
    5
  • 使用 replay 对划分好的数据源进行全速回放:

    timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict)
    Time elapsed: 9.972 ms

应用例子 3

异构回放:回放的输出表注入 streamFilter 引擎,进一步过滤分发处理

  • 定义 streamFilter:

    //定义 streamFilter 输入表
    share streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT[]]) as streamFilter_input
    
    //定义 streamFilter 输出表
    filterOrder=table(100:0, `sym`date`time`volume, [SYMBOL, DATE, SECOND, INT[]])
    filterTrades=table(100:0, `sym`date`time`volume`price, [SYMBOL, DATE, SECOND, INT[], DOUBLE[]])
    //设置 streamFilter 的过滤和处理条件
    filter1=dict(STRING,ANY)
    filter1['condition']=`msg1
    filter1['handler']=filterOrder
    
    filter2=dict(STRING,ANY)
    filter2['condition']=`msg2
    filter2['handler']=filterTrades
    schema=dict(["msg1","msg2"], [filterOrder, filterTrades])
    
    //定义 streamFilter,对接收的数据进行处理,分别分发到表 filterOrder 和 filterTrades
    stEngine=streamFilter(name=`streamFilter, dummyTable=streamFilter_input, filter=[filter1,filter2], msgSchema=schema)
  • 订阅表 opt,将结果注入 streamFilter

    subscribeTable(tableName="opt", actionName="sub1", offset=0, handler=stEngine, msgAsTable=true)
  • 使用 replay 对划分好的数据源进行全速回放:

    timer replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict)
    Time elapsed: 9.012 ms

    以下是 streamFilter 输出表 filterOrderfilterTrades 的结构以及数据预览

    filterOrder.schema().colDefs

    name

    typeString

    typeInt

    sym SYMBOL 17
    date DATE 6
    time SECOND 10
    volume INT[] 68
    select * from filterOrder limit 5
    sym myDate myTime vol
    IBM 2021.01.02 13:00:00 [89,26,10]
    GS 2021.01.02 13:00:01 [52,30,59]
    IBM 2021.01.02 13:00:02 [45,11,87]
    GS 2021.01.02 13:00:03 [92,0,36]
    IBM 2021.01.02 13:00:04 [85,98,47]