上传数据对象
API 支持通过 DBConnection::upload
方法将本地对象上传到 DolphinDB。本节将介绍
DBConnection::upload
的参数和使用示例。
DBConnection::upload
方法的声明如下:
ConstantSP upload(const string& name, const ConstantSP& obj);
参数
-
name
:上传后对象的名字。 -
obj
:待上传的对象。
示例1:上传表对象
//定义一个函数创建一个表
TableSP createDemoTable(){
vector<string> colNames = {"name", "date", "price"};
vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
int colNum = 3, rowNum = 10000, indexCapacity=10000;
ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
vector<VectorSP> columnVecs;
for(int i = 0; i < colNum; ++i)
columnVecs.push_back(table->getColumn(i));
int array_dt_buf[Util::BUF_SIZE]; //定义date列缓冲区数组
double array_db_buf[Util::BUF_SIZE]; //定义price列缓冲区数组
int start = 0;
int no=0;
while (start < rowNum) {
size_t len = std::min(Util::BUF_SIZE, rowNum - start);
int *dtp = columnVecs[1]->getIntBuffer(start, len, array_dt_buf); //dtp指向每次通过 `getIntBuffer` 得到的缓冲区的头部
double *dbp = columnVecs[2]->getDoubleBuffer(start, len, array_db_buf); //dbp指向每次通过 `getDoubleBuffer` 得到的缓冲区的头部
for (size_t i = 0; i < len; ++i) {
columnVecs[0]->setString(i+start, "name_"+std::to_string(++no)); //对string类型的name列直接进行赋值,不采用getbuffer的方式
dtp[i] = 17898+i;
dbp[i] = (rand()%100)/3.0;
}
columnVecs[1]->setInt(start, len, dtp); //写完后使用 `setInt` 将缓冲区写回数组
columnVecs[2]->setDouble(start, len, dbp); //写完后使用 `setDouble` 将缓冲区写回数组
start += len;
}
return table;
}
int main(int argc, const char **argv)
{
DBConnection conn(false, false, 30, false);
conn.connect("127.0.0.1", 8848, "admin", "123456");
ConstantSP t = createDemoTable();
ConstantSP result = conn.upload("t", t); //上传表
std::cout << conn.run("t")->size() << std::endl; //输出10000
}
示例2:上传压缩后的表对象
自 1.30.19 版本开始,为减少网络传输的开销,C++ API 支持通过 setColumnCompressMethods
方法对表数据压缩后上传。setColumnCompressMethods
方法可以灵活地为表中每列数据分别指定不同的压缩方式。目前支持 lz4 和 delta 两种压缩算法。其中 delta 算法仅可用于 SHORT, INT, LONG
与时间或日期类型数据。在网络速度较慢的情况下,推荐对表数据进行压缩。
使用方法如下:
-
创建 DBConnection 对象时,需指定
compress=true
以开启压缩下载功能。 -
上传数据前,通过
setColumnCompressMethods
指定 Table 中每一列的压缩方式。
int main(int argc, const char **argv)
{
//第四个参数指定为 true 表示开启下载压缩
DBConnection conn_compress(false, false, 7200, true);
//连接服务器
conn_compress.connect("127.0.0.1", 8848);
//创建一个共享流表,包含 DATE 和 LONG 两种类型的列
conn_compress.run("share streamTable(1:0, `time`value,[DATE,LONG]) as table1");
//构造上传的数据,600000条
const int count = 600000;
vector<int> time(count);
vector<long long>value(count);
int basetime = Util::countDays(2012, 1, 1);
for (int i = 0; i<count; i++) {
time[i] = basetime + (i % 15);
value[i] = i;
}
VectorSP timeVector = Util::createVector(DT_DATE, count, count);
VectorSP valueVector = Util::createVector(DT_LONG, count, count);
timeVector->setInt(0, count, time.data());
valueVector->setLong(0, count, value.data());
vector<ConstantSP> colVector{ timeVector,valueVector };
//创建Table
vector<string> colName = { "time","value" };
TableSP table = Util::createTable(colName, colVector);
//指定每一列的压缩方式
vector<COMPRESS_METHOD> typeVec{ COMPRESS_DELTA,COMPRESS_LZ4 };
table->setColumnCompressMethods(typeVec);
//压缩上传Table到服务器的流表(table1)
vector<ConstantSP> args{ table };
int insertCount = conn_compress.run("tableInsert{table1}", args)->getInt();
//输出600000
std::cout << insertCount << std::endl;
}