AutoFitTableUpsert

AutoFitTableUpsert (AFTU) 是 AFTA 更新写的版本。在创建 AFTU 对象时,需要指定一个列名 keyColName。新插入数据时,如果该列的值之前不存在于数据库中,则 AFTU 直接将数据插入;如果该列的值已存在于数据库中,则 AFTU 对该条数据进行更新。AFTU 更适合于由重复数据写入的场景。

构造方法

AutoFitTableUpsert(string dbUrl, string tableName, DBConnection& conn,
                   bool ignoreNull=false,
                   vector<string> *pkeyColNames=nullptr,
                   vector<string> *psortColumns=nullptr);

参数:

  • dbUrl:分布式数据库的地址。更新内存表时此处填空字符串即可。

  • tableName: 表名。

  • DBConnection: 一个已经连接成功的DBConnection

  • ignoreNull: 表示如果新数据中某行存在空值元素时,是否对目标表中的相应数据进行更新。默认值为 false

  • pkeyColNames:由于 DFS 表没有键值列,对 DFS 表进行更新时,将该参数指定的列视为键值列

  • psortColumns:设置该参数,更新的分区内的所有数据会根据指定的列进行排序。排序在每个分区内部进行,不会跨分区排序。对于后三个参数更详细的说明,参考:upsert!

数据插入方法

int upsert(TableSP table);

参数 table 表示由待插入数据组成的表。

返回值:插入数据的条数

函数说明:该方法会阻塞直至数据插入完成

环境准备

执行下面的脚本创建一个分布式表:

dbPath = "dfs://SAMPLE_TRDDB";
tableName = `demoTable
if(existsDatabase(dbPath)){
	dropDatabase(dbPath)
}
db = database(dbPath, VALUE, 2010.01.01..2010.01.30)
pt=db.createPartitionedTable(table(1000000:0, `name`date`price, [STRING,DATE,DOUBLE]), tableName, `date)

C++ 代码实现

TableSP createDemoTable(){
    vector<string> colNames = {"name", "date","price"};
    vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
    int colNum = 3, rowNum = 10, indexCapacity = 10;
    ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
    vector<VectorSP> columnVecs;
    for(int i = 0; i < colNum; ++i)
        columnVecs.push_back(table->getColumn(i));

    for(int i = 0; i < rowNum; ++i){
        columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i)));
        columnVecs[1]->set(i, Util::createDate(2010, 1, i+1));
        columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0));
    }
    return table;
}

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    std::vector<std::string> keyNames{"name"};
    AutoFitTableUpsert upserter("dfs://SAMPLE_TRDDB", "demoTable", conn, false, &keyNames);
    TableSP table = createDemoTable();
    upserter.upsert(table);
    ConstantSP result = conn.run("select * from loadTable('dfs://SAMPLE_TRDDB', `demoTable)");
    std::cout <<  result->getString() << std::cout;
}