PartitionedTableAppender
利用 tableInsert
来并发写入分布式表的方法实现起来比较复杂,C++ API 提供了PartitionedTableAppender
(PTA) 类来更简便的实现自动按分区将数据并行写入的方法。
构造方法
PartitionedTableAppender(string dbUrl, string tableName, string partitionColName, DBConnectionPool& pool);
参数:
-
dbUrl: 分布式数据库地址
-
tableName: 表名。
-
partitionColName: 分区字段
-
DBConnectionPool: 连接池
函数说明:
其基本原理是设计一个连接池,然后获取分布式表的分区信息,将分区分配给连接池来并行写入,一个分区在同一时间只能由一个连接写入。
数据插入方法
int append(TableSP table);
参数 table
表示由待插入数据组成的表:
返回值:插入数据的条数。
函数说明:该方法会阻塞直至数据插入完成
环境准备
执行下面的脚本创建一个分布式表:
dbPath = "dfs://SAMPLE_TRDDB";
tableName = `demoTable
if(existsDatabase(dbPath)){
dropDatabase(dbPath)
}
db = database(dbPath, VALUE, 2010.01.01..2010.01.30)
pt=db.createPartitionedTable(table(1000000:0, `name`date`price, [STRING,DATE,DOUBLE]), tableName, `date)
C++ 代码实现
TableSP createDemoTable(){
vector<string> colNames = {"name", "date","price"};
vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
int colNum = 3, rowNum = 10, indexCapacity = 10;
ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
vector<VectorSP> columnVecs;
for(int i = 0; i < colNum; ++i)
columnVecs.push_back(table->getColumn(i));
for(int i = 0; i < rowNum; ++i){
columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i)));
columnVecs[1]->set(i, Util::createDate(2010, 1, i+1));
columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0));
}
return table;
}
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
DBConnectionPool pool("127.0.0.1", 8848, 20, "admin", "123456");
PartitionedTableAppender appender("dfs://SAMPLE_TRDDB", "demoTable", "date", pool);
TableSP table = createDemoTable();
appender.append(table);
ConstantSP result = conn.run("select * from loadTable('dfs://SAMPLE_TRDDB', `demoTable)");
std::cout << result->getString() << std::cout;
}