上传数据对象

API 支持通过 DBConnection::upload 方法将本地对象上传到 DolphinDB。本节将介绍 DBConnection::upload 的参数和使用示例。

DBConnection::upload方法的声明如下:

ConstantSP upload(const string& name, const ConstantSP& obj);

参数

  • name:上传后对象的名字。
  • obj:待上传的对象。

示例1:上传表对象

//定义一个函数创建一个表
TableSP createDemoTable(){
    vector<string> colNames = {"name", "date", "price"};
    vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
    int colNum = 3, rowNum = 10000, indexCapacity=10000;
    ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
    vector<VectorSP> columnVecs;
    for(int i = 0; i < colNum; ++i)
        columnVecs.push_back(table->getColumn(i));

    int array_dt_buf[Util::BUF_SIZE];                                          //定义date列缓冲区数组
    double array_db_buf[Util::BUF_SIZE];                                       //定义price列缓冲区数组

    int start = 0;
    int no=0;
    while (start < rowNum) {
        size_t len = std::min(Util::BUF_SIZE, rowNum - start);
        int *dtp = columnVecs[1]->getIntBuffer(start, len, array_dt_buf);       //dtp指向每次通过 `getIntBuffer` 得到的缓冲区的头部
        double *dbp = columnVecs[2]->getDoubleBuffer(start, len, array_db_buf); //dbp指向每次通过 `getDoubleBuffer` 得到的缓冲区的头部
        for (size_t i = 0; i < len; ++i) {
            columnVecs[0]->setString(i+start, "name_"+std::to_string(++no));    //对string类型的name列直接进行赋值,不采用getbuffer的方式
            dtp[i] = 17898+i;
            dbp[i] = (rand()%100)/3.0;
        }
        columnVecs[1]->setInt(start, len, dtp);                                 //写完后使用 `setInt` 将缓冲区写回数组
        columnVecs[2]->setDouble(start, len, dbp);                              //写完后使用 `setDouble` 将缓冲区写回数组
        start += len;
    }
    return table;
}

int main(int argc, const char **argv)
{
    DBConnection conn(false, false, 30, false);
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    
    ConstantSP t = createDemoTable();
    ConstantSP result = conn.upload("t", t);                                   //上传表
    std::cout << conn.run("t")->size() << std::endl;                           //输出10000
}

示例2:上传压缩后的表对象

自 1.30.19 版本开始,为减少网络传输的开销,C++ API 支持通过 setColumnCompressMethods 方法对表数据压缩后上传。setColumnCompressMethods 方法可以灵活地为表中每列数据分别指定不同的压缩方式。目前支持 lz4 和 delta 两种压缩算法。其中 delta 算法仅可用于 SHORT, INT, LONG 与时间或日期类型数据。在网络速度较慢的情况下,推荐对表数据进行压缩。

使用方法如下:

  1. 创建 DBConnection 对象时,需指定 compress=true 以开启压缩下载功能。
  2. 上传数据前,通过 setColumnCompressMethods 指定 Table 中每一列的压缩方式。
int main(int argc, const char **argv)
{
    //第四个参数指定为 true 表示开启下载压缩
    DBConnection conn_compress(false, false, 7200, true);
    //连接服务器
    conn_compress.connect("127.0.0.1", 8848);
    //创建一个共享流表,包含 DATE 和 LONG 两种类型的列
    conn_compress.run("share streamTable(1:0, `time`value,[DATE,LONG]) as table1");
    //构造上传的数据,600000条
    const int count = 600000;
    vector<int> time(count);
    vector<long long>value(count);
    int basetime = Util::countDays(2012, 1, 1);
    for (int i = 0; i<count; i++) {
        time[i] = basetime + (i % 15);
        value[i] = i;
    }
    VectorSP timeVector = Util::createVector(DT_DATE, count, count);
    VectorSP valueVector = Util::createVector(DT_LONG, count, count);
    timeVector->setInt(0, count, time.data());
    valueVector->setLong(0, count, value.data());
    vector<ConstantSP> colVector{ timeVector,valueVector };
    //创建Table
    vector<string> colName = { "time","value" };
    TableSP table = Util::createTable(colName, colVector);
    //指定每一列的压缩方式
    vector<COMPRESS_METHOD> typeVec{ COMPRESS_DELTA,COMPRESS_LZ4 };
    table->setColumnCompressMethods(typeVec);
    //压缩上传Table到服务器的流表(table1)
    vector<ConstantSP> args{ table };
    int insertCount = conn_compress.run("tableInsert{table1}", args)->getInt();
    //输出600000
    std::cout << insertCount << std::endl;
}