C++ API
DolphinDB C++ API 支持以下开发环境:
- Linux
- Windows Visual Studio
- Windows GNU(MinGW)
编译 API 动态库
本小节讲解如何在 Windows 环境下使用 Visual Studio 2022、在 Linux 环境(以 RedHat 为例),以及在 Windows 环境下使用 MinGW 编译出 API 动态库。
Windows 环境下使用 Visual Studio 2022编译
环境准备
- 安装软件 Visual Studio 2022,Git,CMake。
- 下载 OpenSSL 库。本文中的下载目录是
D:/temp/openssl-1.0.2l-vs2017
。
代码准备
在 Gitee 的 api-cplusplus 项目中下载代码,并切换至预期分支。以 main 分支为例,在 Git Bash 中执行如下命令:
git clone git@gitee.com:dolphindb/api-cplusplus.git
git checkout -b main origin/main
编译
在 PowerShell 中执行如下命令,即可成功编译出 DolphinDBAPI.dll 和 DolphinDBAPI.lib:
cd path/to/api-cplusplus
mkdir build && cd build
cmake .. -G "Visual Studio 17 2022" -A x64 -DUSE_OPENSSL=1 -DOPENSSL_PATH=D:/temp/openssl-1.0.2l-vs2017 -DCMAKE_CONFIGURATION_TYPES="Release;Debug"
cmake --build . --config Release
cmake
参数说明:
- -G:用来指定编译器。如果安装为其它版本的 Visual Studio,则此处须对应修改。
- -A:x64指定生成64位的动态库,Win32 指定生成32位的动态库。
- -DUSE_OPENSSL=1:如果想使用 OpenSSL 来加密 API 和 DolphinDB 之间的通信,则必须指定该参数。指定后编译 API 动态库时会去链接 SSL 以及 Crypto 动态库。
- -DOPENSSL_PATH=/path/to/openssl:如果系统中没有安装 OpenSSL,则可以通过该参数来指定路径。
- -DCMAKE_CONFIGURATION_TYPES="Release;Debug":生成的 Visual Studio 项目文件支持编译 Release 或 Debug 类型的动态库。
- --config Release:生成 Release 类型的目标文件,也可以改为 Debug。
注意事项
如果使用的是从上述网址下载的 OpenSSL 库,并且想编译64位的动态库,那么需要在执行 cmake
命令之前,删除该下载的 OpenSSL 库目录中的 bin,include,以及 lib 文件夹,并将 bin64,include64以及 lib64文件夹重命名为 bin、include 以及 lib,因为 CMake 会在这些目录中寻找头文件以及库。
Linux 环境下编译(以 RedHat 为例)
环境准备
安装 gcc/g++(version >= 4.8.5),Git,CMake。
安装 Openssl-devel 以及 Uuid-devel。
yum install openssl-devel yum install libuuid-devel
代码准备
在 Gitee 的 api-cplusplus 项目中下载代码,并切换至预期分支。以 main 分支为例,在终端中执行如下命令:
git clone git@gitee.com:dolphindb/api-cplusplus.git
git checkout -b main origin/main
编译
在终端中执行如下命令,即可成功编译出 libDolphinDBAPI.so:
cd path/to/api-cplusplus
mkdir build && cd build
cmake .. -DABI=0 -DUUID_PATH=/home/uuid/ -DUSE_OPENSSL=1 -DOPENSSL_PATH=/home/openssl/
make -j4
cmake
参数说明:
- -DABI=0:用来指定编译时
_GLIBCXX_USE_CXX11_ABI
的值,可以设置为1或者0。 - -DUUID_PATH=/path/to/uuid:如果系统中没有安装 uuid,则可以通过该参数来指定 UUID 路径,编译时会在$/include目录下寻找头文件,在$/lib目录下寻找 uuid 库
- -DUSE_OPENSSL=1:如果想使用 OpenSSL 来加密 API 和 DolphinDB 之间的通信,则必须指定该参数。指定后编译 API 动态库时会去链接 SSL 以及 Crypto 动态库。
- -DOPENSSL_PATH=/path/to/openssl:如果系统中没有安装 OpenSSL,则可以通过该参数来指定 OPENSSL 路径。
Windows 环境下使用 MinGW 编译
环境准备
- 安装软件 MinGW,Git,CMake。
- OpenSSL 库使用的是 API 源码中 lib 目录下的 openssl-1.0.2u。
代码准备
在 Gitee 的 api-cplusplus 项目中下载代码,并切换至预期分支。以main分支为例,在 Git Bash 中执行如下命令:
git clone git@gitee.com:dolphindb/api-cplusplus.git
git checkout -b main origin/main
编译
在 Power Shell 中执行如下命令,即可成功编译出 libDolphinDBAPI.dll:
cd path/to/api-cplusplus
mkdir build && cd build
cmake .. -G "MinGW Makefiles" -DUSE_OPENSSL=1 -DOPENSSL_PATH=D:/workspace/testCode/api-cplusplus/lib/openssl-1.0.2u/openssl-1.0.2u/static
cmake --build .
cmake
参数说明:
- -G:用来指定编译器。
- -DUSE_OPENSSL=1:如果想使用 OpenSSL 来加密 API 和 DolphinDB 之间的通信,则必须指定该参数。指定后编译 API 动态库时会去链接 SSL 以及 Crypto 动态库。
- -DOPENSSL_PATH=/path/to/openssl:如果系统中没有安装 OpenSSL,则可以通过该参数来指定路径。
编译可执行文件
本小节主要讲解如何在 Windows 环境下使用 Visual Studio 2022、在 Linux 环境(以 RedHat 为例),以及在 Windows 环境下使用 MinGW 编译一个可执行文件,并链接上一对应小节生成的 API 动态库以连接数据库。
注:这部分的工程存在于 API 源码中的 demo 文件夹内。
Windows 环境下使用 Visual Studio 2022编译
将对应环境下生成的 DolphinDBAPI.lib 拷贝到
demo/lib
目录下。在 PowerShell 中执行下述命令:
cd path/to/api-cplusplus/demo mkdir build && cd build cmake .. -G "Visual Studio 17 2022" -A x64 -DUSE_OPENSSL=1 -DOPENSSL_PATH=D:/temp/openssl-1.0.2l-vs2017 -DCMAKE_CONFIGURATION_TYPES="Release;Debug" cmake --build . --config Release
cmake
各项参数含义与前述编译中的相同。生成的可执行文件 apiDemo.exe 在
demo\bin\Debug
目录下。将所依赖的动态库(DolphinDBAPI.dll libeay32MD.dll ssleay32MD.dll)拷贝到 .exe 文件同目录下即可执行。
Linux 环境下编译(以 RedHat 为例)
将对应环境下生成的 libDolphinDBAPI.so 拷贝到 demo/lib 目录下.
在终端中执行下述命令:
cd path/to/api-cplusplus/demo mkdir build && cd build cmake .. -DABI=0 -DUSE_OPENSSL=1 -DOPENSSL_PATH=/home/openssl/ cmake --build .
cmake
各项参数说明如下所示,注意 ABI 的值必须相同:- DABI=0:用来指定编译时 _GLIBCXX_USE_CXX11_ABI 的值,可以设置为1或者0。
- DUSE_OPENSSL=1:如果想使用 OpenSSL 来加密 API 和 DolphinDB 之间的通信,则必须指定该参数。指定后编译 API 动态库时会去链接 SSL 以及 Crypto 动态库。
- DOPENSSL_PATH=/path/to/openssl:如果系统中没有安装 OpenSSL,则可以通过该参数来指定路径。
生成的可执行文件 apiDemo 在
demo\bin
目录下。
Windows 环境下使用 MinGW 编译
将对应环境下生成的 libDolphinDBAPI.dll 拷贝到
demo/lib
目录下。在 PowerShell 中执行下述命令:
cd path/to/api-cplusplus/demo mkdir build && cd build cmake .. -G "MinGW Makefiles" -DUSE_OPENSSL=1 -DOPENSSL_PATH=D:/workspace/testCode/api-cplusplus/lib/openssl-1.0.2u/openssl-1.0.2u/static cmake --build .
cmake
各项参数含义与前述相同。生成的可执行文件 apiDemo.exe 在
demo\bin
目录下。将所依赖的动态库(libDolphinDBAPI.dll,libeay32.dll,ssleay32.dll)拷贝到 .exe文件同目录下即可执行。
建立 DolphinDB 连接
DolphinDB C++ API 提供的最核心的对象是 DBConnection。C++应用可以通过它在 DolphinDB 服务器上执行脚本和函数,并在两者之间双向传递数据。DBConnection 类提供如下主要方法:
方法 | 详情 |
---|---|
DBConnection([enableSSL, asynTask,keepAliveTime,compress]) | 构造对象 |
connect(host, port, [username, password,initialScript,highAvailability,highAvailabilitySites,keepAliveTime]) | 将会话连接到DolphinDB服务器 |
login(username,password,enableEncryption) | 登录服务器 |
run(script) | 将脚本在 DolphinDB 服务器运行 |
run(functionName,args) | 调用 DolphinDB 服务器上的函数 |
upload(variableObjectMap) | 将本地数据对象上传到 DolphinDB 服务器 |
initialize() | 初始化连接信息 |
close() | 关闭当前会话。若当前会话不再使用,会自动被释放,但存在释放延时,可以调用 close() 立即关闭会话。否则可能出现因连接数过多,导致其它会话无法连接服务器的问题。 |
C++ API 通过 TCP/IP 协议连接到 DolphinDB。使用 connect
方法创建连接时,需要提供 DolphinDB server 的 IP 和端口。
DBConnection conn; bool ret = conn.connect("127.0.0.1", 8848);
声明 connection 变量的时候,有两个可选参数:enableSSL(支持SSL),asynTask(支持一部分)。这两个参数默认值为 false。 目前只支持 linux, 稳定版>=1.10.17,最新版>=1.20.6。
下面例子是,建立支持 SSL 而非支持异步的 connection,每30秒做一次心跳检测,要求数据进行压缩。服务器端应该添加参数 enableHTTPS=true(单节点部署,需要添加到 dolphindb.cfg;集群部署需要添加到 cluster.cfg)。
DBConnection conn(true,false,30,true)
下面建立不支持 SSL,但支持异步的 connection。异步情况下,只能执行 DolphinDB 脚本和函数, 且不再有返回值。该功能适用于异步写入数据。
DBConnection conn(false,true)
创建连接时也可以使用用户名和密码登录,默认的管理员名称为"admin",密码是"123456"。
DBConnection conn; bool ret = conn.connect("127.0.0.1", 8848, "admin", "123456");
可通过 connect
函数的返回值来确认是否连接成功。若结果返回 Ture,则说明连接成功;若返回值为 False,则表示连接失败。
若未使用用户名及密码连接成功,则脚本在 Guest 权限下运行。后续运行中若需要提升权限,可以使用 conn.login('admin','123456',true) 登录获取权限。
请注意,DBConnection 类的所有函数都不是线程安全的,不可以并行调用,否则可能会导致程序崩溃。
运行 DolphinDB 脚本
通过 run
方法运行 DolphinDB 脚本:
ConstantSP v = conn.run("`IBM`GOOG`YHOO");
cout<<v->getString()<<endl;
输出结果为:
["IBM", "GOOG", "YHOO"]
注意:使用 run(script)
方法时,不建议用户在 script 中使用 DolphinDB 的 for 循环以进行表的写入操作。若使用,则可能因为去重操作而丢失数据。
建议操作方法如下:
- 以外部 for 循环调用
run("tableInsert(…)")
的方式一行行插入数据; - 把数据构造为一个 table 对象,然后调用 API 表写入接口(
AutoFitTableAppender
、AutoFitTableUpsert
、PartitionedTableAppender
等)进行写入; - 调用
MultithreadedTableWriter
接口的insert()
方法一行行写入数据。
运行 DolphinDB 函数
除了运行脚本之外,run 命令还可以直接在远程DolphinDB服务器上执行DolphinDB内置或用户自定义函数。若 run
方法只有一个参数,则该参数为脚本;若 run
方法有两个参数,则第一个参数为DolphinDB中的函数名,第二个参数是该函数的参数,为ConstantSP类型的向量。
下面的示例展示C++程序通过 run
调用DolphinDB内置的 add
函数。add
函数有两个参数 x 和 y。参数的存储位置不同,也会导致调用方式的不同。可能有以下三种情况:
所有参数都在DolphinDB server端
若变量 x 和 y 已经通过C++程序在服务器端生成,
conn.run("x = [1, 3, 5]; y = [2, 4, 6]");
那么在C++端要对这两个向量做加法运算,只需直接使用
run
即可。ConstantSP result = conn.run("add(x,y)"); cout<<result->getString()<<endl;
输出结果为:
[3, 7, 11]
仅有一个参数在DolphinDB server端存在
若变量 x 已经通过C++程序在服务器端生成,
conn.run("x = [1, 3, 5]");
而参数 y 要在 C++客户端生成,这时就需要使用“部分应用”方式,把参数 x 固化在
add
函数内。具体请参考部分应用文档。vector<ConstantSP> args; ConstantSP y = Util::createVector(DT_DOUBLE, 3); double array_y[] = {1.5, 2.5, 7}; y->setDouble(0, 3, array_y); args.push_back(y); ConstantSP result = conn.run("add{x,}", args); cout<<result->getString()<<endl;
输出结果为:
[2.5, 5.5, 12]
两个参数都待由 C++客户端赋值
vector<ConstantSP> args; ConstantSP x = Util::createVector(DT_DOUBLE, 3); double array_x[] = {1.5, 2.5, 7}; x->setDouble(0, 3, array_x); ConstantSP y = Util::createVector(DT_DOUBLE, 3); double array_y[] = {8.5, 7.5, 3}; y->setDouble(0, 3, array_y); args.push_back(x); args.push_back(y); ConstantSP result = conn.run("add", args); cout<<result->getString()<<endl;
输出结果为:
[10, 10, 10]
上传数据对象
C++ API 提供 upload
方法,将本地对象上传到 DolphinDB。
上传表对象
下面的例子在 C++ 端定义了一个 createDemoTable
函数,该函数创建了一个本地的表对象。
TableSP createDemoTable(){ vector<string> colNames = {"name", "date","price"}; vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE}; int colNum = 3, rowNum = 10000, indexCapacity=10000; ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity); vector<VectorSP> columnVecs; for(int i = 0; i < colNum; ++i) columnVecs.push_back(table->getColumn(i)); for(unsigned int i = 0; i < rowNum; ++i){ columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i))); columnVecs[1]->set(i, Util::createDate(2010, 1, i+1)); columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0)); } return table; }
需要注意的是,上述例子中采用的 set
方法作为一个虚函数,会产生较大的开销。调用 set
方法对表的列向量逐个赋值,在数据量很大的情况下会导致效率低下。此外,createString
, createDate
与 createDouble
等构造方法要求操作系统为其分配内存,反复调用同样会产生很大的内存开销。
相对合理的做法是定义一个相应类型的数组,通过诸如 setInt(INDEX start, int len, const int* buf) 的方式一次或者多次地将数据批量传给列向量。
当表对象的数据量较小时,可以采用上述例子中的方式生成 TableSP 对象的数据,但是当数据量较多时,建议采用如下方式来生成数据。
TableSP createDemoTable(){ vector<string> colNames = {"name", "date", "price"}; vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE}; int colNum = 3, rowNum = 10000, indexCapacity=10000; ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity); vector<VectorSP> columnVecs; for(int i = 0; i < colNum; ++i) columnVecs.push_back(table->getColumn(i)); int array_dt_buf[Util::BUF_SIZE]; //定义date列缓冲区数组 double array_db_buf[Util::BUF_SIZE]; //定义price列缓冲区数组 int start = 0; int no=0; while (start < rowNum) { size_t len = std::min(Util::BUF_SIZE, rowNum - start); int *dtp = columnVecs[1]->getIntBuffer(start, len, array_dt_buf); //dtp指向每次通过 `getIntBuffer` 得到的缓冲区的头部 double *dbp = columnVecs[2]->getDoubleBuffer(start, len, array_db_buf); //dbp指向每次通过 `getDoubleBuffer` 得到的缓冲区的头部 for (int i = 0; i < len; ++i) { columnVecs[0]->setString(i+start, "name_"+std::to_string(++no)); //对string类型的name列直接进行赋值,不采用getbuffer的方式 dtp[i] = 17898+i; dbp[i] = (rand()%100)/3.0; } columnVecs[1]->setInt(start, len, dtp); //写完后使用 `setInt` 将缓冲区写回数组 columnVecs[2]->setDouble(start, len, dbp); //写完后使用 `setDouble` 将缓冲区写回数组 start += len; } return table; }
上述例子采用的诸如 getIntBuffer
等方法能够直接获取一个可读写的缓冲区,写完后使用 setInt
将缓冲区写回数组。这类函数会检查给定的缓冲区地址和变量底层储存的地址是否一致,如果一致就不会发生数据拷贝。在多数情况下,用 getIntBuffer
获得的缓冲区就是变量实际的存储区域,这样能减少数据拷贝,提高性能。
利用上例中自定义的 createDemoTable
函数创建一个表对象。通过 upload
方法将它上传到DolphinDB,再从DolphinDB获取这个表的数据,保存到本地对象result并打印。
TableSP table = createDemoTable(); conn.upload("myTable", table); string script = "select * from myTable;"; ConstantSP result = conn.run(script); cout<<result->getString()<<endl;
输出结果为:
name date price
------- ---------- ---------
name_1 2019.01.02 27.666667
name_2 2019.01.03 28.666667
name_3 2019.01.04 25.666667
name_4 2019.01.05 5
name_5 2019.01.06 31
...
上传压缩后的表对象
自 1.30.19 版本开始,C++ API 支持通过 setColumnCompressMethods
方法,对表数据压缩后上传,以减少网络传输的开销。setColumnCompressMethods
方法可以灵活的为表中每列数据分别指定不同的压缩方式。目前支持 lz4 和 delta 两种压缩算法,但 delta 算法仅可用于 SHORT, INT, LONG 与时间或日期类型数据。
在网络速度较慢的情况下,推荐对表数据进行压缩。使用方法如下:
- 创建 DBConnection 对象时,需指定 compress=true 以开启压缩下载功能。
- 上传数据前,通过
setColumnCompressMethods
指定 Table 中每一列的压缩方式。
以下为范例代码:
//第四个参数指定为 true 表示开启下载压缩 DBConnection conn_compress(false, false, 7200, true); //连接服务器 conn_compress.connect(hostName, port); //创建一个共享流数据表,包含 DATE 和 LONG 两种类型的列 conn_compress.run("share streamTable(1:0, `time`value,[DATE,LONG]) as table1"); //构造上传的数据,约600000条 const int count = 600000; vector<int> time(count); vector<long long>value(count); int basetime = Util::countDays(2012, 1, 1); for (int i = 0; i<count; i++) { time[i] = basetime + (i % 15); value[i] = i; } VectorSP timeVector = Util::createVector(DT_DATE, count, count); VectorSP valueVector = Util::createVector(DT_LONG, count, count); timeVector->setInt(0, count, time.data()); valueVector->setLong(0, count, value.data()); vector<ConstantSP> colVector{ timeVector,valueVector }; //创建Table vector<string> colName = { "time","value" }; TableSP table = Util::createTable(colName, colVector); //指定每一列的压缩方式 vector<COMPRESS_METHOD> typeVec{ COMPRESS_DELTA,COMPRESS_LZ4 }; table->setColumnCompressMethods(typeVec); //压缩上传Table到服务器的流数据表(table1) vector<ConstantSP> args{ table }; int insertCount = conn_compress.run("tableInsert{table1}", args)->getInt(); std::cout << insertCount << std::endl;
读取数据示例
DolphinDB C++ API 不仅支持Int, Float, String, Date, DataTime等多种数据类型,也支持向量(VectorSP)、集合(SetSP)、矩阵(MatrixSP)、字典(DictionarySP)、表(TableSP)等多种数据形式。下面介绍如何通过DBConnection对象,读取并操作DolphinDB的各种形式的对象。
首先加上必要的头文件:
#include "DolphinDB.h" #include "Util.h"
向量
创建INT类型的向量:
VectorSP v = conn.run("1..10"); int size = v->size(); for(int i = 0; i < size; ++i) cout<<v->getInt(i)<<endl;
创建DATE类型的向量:
VectorSP v = conn.run("2010.10.01..2010.10.30"); int size = v->size(); for(int i = 0; i < size; ++i) cout<<v->getString(i)<<endl;
从 1.30.21/2.00.9 版本开始,C++ API 新增 DdbVector 类,可方便地创建 DolphinDB 各种数据类型的 Vector。使用时需要注意以下几点:
- DdbVector 为泛型类,在创建时需指定 C++ 原生类型,如std::string, int, double 等。
- 必须在创建时指明 Vector 的容量。不支持扩容。
- 可通过
add
,append
,set
等方法填充数据。 - 在填充数据后,通过
createVector
来创建 DDB Vector,须指定与 DdbVector 声明类型相匹配的 DolphinDB 的数据类型,如 DT_STRING, DT_INT 等。
举例说明:
//创建一个包含30000行元素的表
TableSP table;
int size = 30000;
DdbVector<double> doubleV(0, size);
DdbVector<float> floatV(0, size);
DdbVector<int> intV(0, size);
DdbVector<short> shortV(0, size);
DdbVector<char> boolV(0, size);
DdbVector<char> charV(0, size);
DdbVector<string> strV(0, size);
unsigned char int128[16];
DdbVector<Guid> int128V(0, size);
for (auto i = 0; i < size; i++) {
doubleV.add(i);
floatV.add(i);
intV.add(i);
shortV.add(i);
boolV.add(i%1);
charV.add(i%256);
strV.add("str"+std::to_string(i));
int128[i % 16] = i % 256;
int128V.add(int128);
}
table=Util::createTable({ "double","float","int","short","bool","char","str","int128" },
{ doubleV.createVector(DT_DOUBLE),floatV.createVector(DT_FLOAT),intV.createVector(DT_INT),
shortV.createVector(DT_SHORT),boolV.createVector(DT_BOOL),charV.createVector(DT_CHAR),
strV.createVector(DT_STRING),int128V.createVector(DT_INT128) });
cout << "table:" << endl << table->getString() << endl;
集合
创建一个集合:
SetSP set = conn.run("set(4 5 5 2 3 11 6)");
cout<<set->getString()<<endl;
矩阵
创建一个矩阵:
ConstantSP matrix = conn.run("1..6$2:3");
cout<<matrix->getString()<<endl;
字典
创建一个字典:
DictionarySP dict = conn.run("dict(1 2 3, `IBM`MSFT`GOOG)"); cout << dict->get(Util::createInt(1))->getString()<<endl;
上例通过 Util::createInt
创建Int类型的值,并使用 get
方法来获得key为1对应的值。
表
在C++客户端中执行以下脚本创建一个表:
string sb; sb.append("n=200\n"); sb.append("syms= `IBM`C`MS`MSFT`JPM`ORCL`BIDU`SOHU`GE`EBAY`GOOG`FORD`GS`PEP`USO`GLD`GDX`EEM`FXI`SLV`SINA`BAC`AAPL`PALL`YHOO`KOH`TSLA`CS`CISO`SUN\n"); sb.append("mytrades=table(09:30:00+rand(18000, n) as timestamp, rand(syms, n) as sym, 10*(1+rand(100, n)) as qty, 5.0+rand(100.0, n) as price); \n"); sb.append("select * from mytrades"); TableSP table = conn.run(sb);
getString()
方法获取表的内容
cout<<table->getString()<<endl;
getColumn()
方法按列获取表的内容
下面的脚本中,首先定义一个VectorSP类型的动态数组columnVecs,用于存放从表中获取的列,然后依次访问columnVecs处理数据。
对于表的各列,我们可以通过getString()
方法获得每一列的字符串类型数组,再通过C++的数据类型转换函数将数值类型的数据转换成对应的数据类型,从而进行计算。对于时间类型的数据,则需要以字符串的形式存储。
vector<VectorSP> columnVecs; for(int i = 0; i < table->columns(); ++i){ columnVecs.push_back(table->getColumn(i)); } int qty[200],sum[200]; double price[200]; for(int i=0; i<200;++i){ qty[i]=atoi(columnVecs[2]->getString(i).c_str()); price[i]=atof(columnVecs[3]->getString(i).c_str()); sum[i]=qty[i]*price[i]; } for(int i = 0; i < 200; ++i){ cout<<columnVecs[0]->getString(i)<<", "<<columnVecs[1]->getString(i)<<", "<<sum[i]<<endl;
getRow()
方法按照行获取表的内容
例如,打印table的第一行,返回的结果是一个字典。
cout<<table->getRow(0)->getString()<<endl; // output price->37.811678 qty->410 sym->IBM timestamp->13:45:15
如果取某一行中的某一列数据可以通过先调用getRow
,再调用getMember
的方法,如下例所示。其中,getMember()
函数的参数不是 C++ 内置的string类型对象,而是DolphinDB C++ API的string类型Constant对象。
cout<<table->getRow(0)->getMember(Util::createString("price"))->getDouble()<<endl; // output 37.811678
需要注意的是,按行访问table并逐一进行计算非常低效。为了达到更好的性能,建议参考使用getcolumn方法按列获取表的内容的方式按列访问table并批量计算。
使用BlockReaderSP
对象分段读取表数据
对于大数据量的表,API提供了分段读取方法。(此方法仅适用于DolphinDB 1.20.5, 1.10.16及其以上版本)
在C++客户端中执行以下脚本创建一个大数据量的表:
string script; script.append("n=20000\n"); script.append("syms= `IBM`C`MS`MSFT`JPM`ORCL`BIDU`SOHU`GE`EBAY`GOOG`FORD`GS`PEP`USO`GLD`GDX`EEM`FXI`SLV`SINA`BAC`AAPL`PALL`YHOO`KOH`TSLA`CS`CISO`SUN\n"); script.append("mytrades=table(09:30:00+rand(18000, n) as timestamp, rand(syms, n) as sym, 10*(1+rand(100, n)) as qty, 5.0+rand(100.0, n) as price); \n"); conn.run(script);
分段读取数据并用getString()方法获取表的内容, 需要注意的是fetchSize必须不小于8192。
string sb = "select * from mytrades"; int fetchSize = 8192; BlockReaderSP reader = conn.run(sb,4,2,fetchSize);//priority=4, parallelism=2 ConstantSP table; int total = 0; while(reader->hasNext()){ table=reader->read(); total += table->size(); cout<< "read" <<table->size()<<endl; cout<<table->getString()<<endl; }
AnyVector
AnyVector是DolphinDB中一种特殊的数据形式,与常规的向量不同,它的每个元素可以是不同的数据类型或数据形式。
ConstantSP result = conn.run("[1, 2, [1,3,5], [0.9, 0.8]]");
cout<<result->getString()<<endl;
使用 get
方法获取第三个元素:
VectorSP v = result->get(2);
cout<<v->getString()<<endl;
结果是一个Int类型的向量[1,3,5]。
ArrayVector
数组向量(array vector)是 DolphinDB 一种特殊的数据形式。与常规的向量不同,它的每个元素是一个数组,具有相同的数据类型,但长度可以不同。目前支持的数据类型为 Logical, Integral(不包括 COMPRESS 类型), Floating, Temporal。
//创建可容纳2个元素的arrayVector,初始大小为1,可容纳2个。 VectorSP arrayVector = Util::createVector(DT_INT_ARRAY, 1, 2); //创建第一个元素 VectorSP value = Util::createVector(DT_INT, 3); value->setInt(0, 1); value->setInt(1, 2); value->setInt(2, 3); //设置第一个元素 arrayVector->set(0, value); //创建第二个元素 value = Util::createVector(DT_INT, 3); value->setInt(0, 4); value->setInt(1, 5); value->setInt(2, 6); //添加第二个元素 arrayVector->append(value); std::cout << arrayVector->getString() << std::endl;
结果是[[1,2,3],[4,5,6]]。
使用 get
方法获取第二个元素:
VectorSP v = result->get(1);
cout<<v->getString()<<endl;
结果是一个Int类型的向量[4,5,6]。
保存数据到DolphinDB数据表
DolphinDB数据表按存储方式分为两种:
- 内存表: 数据仅保存在内存中,存取速度最快,但是节点关闭后数据就不存在了。
- 分布式表(DFS表):数据可保存在不同的节点,亦可保存在同一节点,由分布式文件系统统一管理。路径以"dfs://"开头。
保存数据到DolphinDB内存表
DolphinDB提供多种方式来保存数据到内存表:
- 通过insert into语句保存单条数据
- 通过tableInsert函数批量保存多条数据
- 通过tableInsert函数保存数据表
下面分别介绍三种方式保存数据的实例,在例子中使用到的数据表有3列,分别是STRING, DATE, DOUBLE类型,列名分别为name, date和price。 在DolphinDB中执行以下脚本创建内存表:
t = table(100:0, `name` date`price, [STRING, DATE, DOUBLE]); share t as tglobal;
上面的例子中,我们通过table
函数来创建表,指定了表的容量和初始大小、列名和数据类型。由于内存表是会话隔离的,所以普通内存表只有当前会话可见。为了让多个客户端可以同时访问t,我们使用 share
在会话间共享内存表。
使用insert into语句保存数据
可以采用如下方式保存单条数据。
char script[100]; sprintf(script, "insert into tglobal values(%s, date(timestamp(%ld)), %lf)", "`a", 1546300800000, 1.5); conn.run(script);
也可以使用insert into语句保存多条数据:
string script; int rowNum=10000, indexCapacity=10000; VectorSP names = Util::createVector(DT_STRING, rowNum, indexCapacity); VectorSP dates = Util::createVector(DT_DATE, rowNum, indexCapacity); VectorSP prices = Util::createVector(DT_DOUBLE, rowNum, indexCapacity); int array_dt_buf[Util:: BUF_SIZE]; //定义date列缓冲区数组 double array_db_buf[Util:: BUF_SIZE]; //定义price列缓冲区数组 int start = 0; int no=0; while (start < rowNum) { size_t len = std::min(Util::BUF_SIZE, rowNum - start); int *dtp = dates->getIntBuffer(start, len, array_dt_buf); //dtp指向每次通过 `getIntBuffer` 得到的缓冲区的头部 double *dbp = prices->getDoubleBuffer(start, len, array_db_buf); //dbp指向每次通过 `getDoubleBuffer` 得到的缓冲区的头部 for (int i = 0; i < len; i++) { names->setString(i+start, "name_"+std::to_string(++no)); //对string类型的name列直接进行赋值,不采用getbuffer的方式 dtp[i] = 17898+i; dbp[i] = (rand()%100)/3.0; } dates->setInt(start, len, dtp); //写完后使用 `setInt` 将缓冲区写回数组 prices->setDouble(start, len, dbp); //写完后使用 `setDouble` 将缓冲区写回数组 start += len; } vector<string> allnames = {"names", "dates", "prices"}; vector<ConstantSP> allcols = {names, dates, prices}; conn.upload(allnames, allcols); script += "insert into tglobal values(names, dates, prices); tglobal"; TableSP table = conn.run(script);
使用tableInsert函数批量保存多条数据
在这个例子中,我们利用索引指定TableSP对象的多行数据,将它们批量保存到DolphinDB server上。
vector<ConstantSP> args; TableSP table = createDemoTable(); VectorSP range = Util::createPair(DT_INDEX); range->setIndex(0, 0); range->setIndex(1, 10); cout<<range->getString()<<endl; args.push_back(table->get(range)); conn.run("tableInsert{tglobal}", args);
使用tableInsert函数保存TableSP对象
vector<ConstantSP> args;
TableSP table = createDemoTable();
args.push_back(table);
conn.run("tableInsert{tglobal}", args);
把数据保存到内存表,还可以使用append!
函数,它可以把一张表追加到另一张表。但是,一般不建议通过该函数保存数据,因为它会返回一个空表,不必要地增加通信量。
vector<ConstantSP> args;
TableSP table = createDemoTable();
args.push_back(table);
conn.run("append!(tglobal);", args);
保存数据到分布式表
分布式表是DolphinDB推荐在生产环境下使用的数据存储方式,它支持快照级别的事务隔离,保证数据一致性。分布式表支持多副本机制,既提供了数据容错能力,又能作为数据访问的负载均衡。下面的例子通过C++ API把数据保存至分布式表。
使用tableInsert函数保存TableSP对象
在DolphinDB中使用以下脚本创建分布式表。database
函数用于创建数据库。分布式数据库地路径必须以"dfs://"
开头。createPartitionedTable
函数用于创建分区表。
login( `admin, ` 123456)
dbPath = "dfs://SAMPLE_TRDDB";
tableName = `demoTable
db = database(dbPath, VALUE, 2010.01.01..2010.01.30)
pt=db.createPartitionedTable(table(1000000:0, `name` date `price, [STRING,DATE,DOUBLE]), tableName, ` date)
使用loadTable
方法加载分布式表,通过tableInsert
方式追加数据:
TableSP table = createDemoTable();
vector<ConstantSP> args;
args.push_back(table);
conn.run("tableInsert{loadTable('dfs://SAMPLE_TRDDB', `demoTable)}", args);
append!
函数也能向分布式表追加数据,但是性能与tableInsert
相比要差,建议不要轻易使用:
TableSP table = createDemoTable(); conn.upload("mt", table); conn.run("loadTable('dfs://SAMPLE_TRDDB', `demoTable).append!(mt);"); conn.run(script);
分布式表的并发写入
DolphinDB的分布式表支持并发读写,下面展示如何在C++客户端中将数据并发写入DolphinDB的分布式表。
首先,在DolphinDB服务端执行以下脚本,创建分布式数据库"dfs://natlog"和分布式表"natlogrecords"。其中,数据库按照VALUE-HASH-HASH的组合进行三级分区。
dbName="dfs://natlog" tableName="natlogrecords" db1 = database("", VALUE, datehour(2019.09.11T00:00:00)..datehour(2019.12.30T00:00:00) )//starttime, newValuePartitionPolicy=add db2 = database("", HASH, [IPADDR, 50]) //source_address db3 = database("", HASH, [IPADDR, 50]) //destination_address db = database(dbName, COMPO, [db1,db2,db3]) data = table(1:0, ["fwname","filename","source_address","source_port","destination_address","destination_port","nat_source_address","nat_source_port","starttime","stoptime","elapsed_time"], [SYMBOL,STRING,IPADDR,INT,IPADDR,INT,IPADDR,INT,DATETIME,DATETIME,INT]) db.createPartitionedTable(data,tableName,`starttime`source_address`destination_address)
DolphinDB不允许多个writer同时将数据写入到同一个分区,因此在客户端多线程并行写入数据时,需要确保每个线程分别写入不同的分区。
对于按哈希值进行分区的分布式表, DolphinDB C++ API 提供了getHash
函数来数据的hash值。在客户端设计多线程并发写入分布式表时,可根据哈希分区字段数据的哈希值分组,每组指定一个写线程。这样就能保证每个线程同时将数据写到不同的哈希分区。
ConstantSP spIP = Util::createConstant(DT_IP); int key = spIP->getHash(BUCKETS);
开启生产数据和消费数据的线程,下面的genData
用于生成模拟数据,writeData
用于写数据。
for (int i = 0; i < tLong; ++i) { arg[i].index = i; arg[i].count = tLong; arg[i].nLong = nLong; arg[i].cLong = cLong; arg[i].nTime = 0; arg[i].nStarttime = sLong; genThreads[i] = std::thread(genData, (void *)&arg[i]); writeThreads[i] = std::thread(writeData, (void *)&arg[i]); }
每个生产线程首先生成数据,其中createDemoTable
函数用于产生模拟数据,并返回一个TableSP对象。
void *genData(void *arg) { struct parameter *pParam; pParam = (struct parameter *)arg; long partitionCount = BUCKETS / pParam->count; for (unsigned int i = 0; i < pParam->nLong; i++) { TableSP table = createDemoTable(pParam->cLong, partitionCount * pParam->index, partitionCount, pParam->nStarttime, i * 5); tableQueue[pParam->index]->push(table); } return NULL; }
每个消费线程开始向DolphinDB并行写入数据。
void *writeData(void *arg) { struct parameter *pParam; pParam = (struct parameter *)arg; TableSP table; for (unsigned int i = 0; i < pParam->nLong; i++) { tableQueue[pParam->index]->pop(table); long long startTime = Util::getEpochTime(); vector<ConstantSP> args; args.push_back(table); conn[pParam->index].run("tableInsert{loadTable('dfs://natlog', `natlogrecords)}", args); pParam->nTime += Util::getEpochTime() - startTime; } printf("Thread %d,insert %ld rows %ld times, used %ld ms.\n", pParam->index, pParam->cLong, pParam->nLong, pParam->nTime); return NULL; }
更多分布式表的并发写入案例可以参考样例 MultiThreadDFSWriting.cpp。
利用PartitionedTableAppender并发写入分布式表
上述方法较为复杂,C++ API提供了更简便地自动按分区分流数据并行写入的方法:
PartitionedTableAppender(string dbUrl, string tableName, string partitionColName, DBConnectionPool& pool);
- dbUrl: 分布式数据库地址,若为内存表可设为“”
- tableName: 分布式表名
- partitionColName: 分区字段
- DBConnectionPool: 连接池
使用最新的1.30版本及以上的server,可以使用C++ API中的 PartitionedTableAppender对象来写入分布式表。其基本原理是设计一个连接池,然后获取分布式表的分区信息,将分区分配给连接池来并行写入,一个分区在同一时间只能由一个连接写入。
先在服务器端创建一个数据库 "dfs://SAMPLE_TRDDB" 以及一个分布式表 "demoTable":
login( `admin, `123456) dbPath = "dfs://SAMPLE_TRDDB"; tableName = `demoTable if(existsDatabase(dbPath)){ dropDatabase(dbPath) } db = database(dbPath, VALUE, 2010.01.01..2010.01.30) pt=db.createPartitionedTable(table(1000000:0, `name`date `price, [STRING,DATE,DOUBLE]), tableName, `date)
然后在C++客户端创建连接池pool并传入PartitionedTableAppender,使用append方法往分布式表并发写入本地数据:
DBConnectionPool pool("localhost", 8848, 20, "admin", "123456"); PartitionedTableAppender appender("dfs://SAMPLE_TRDDB", "demoTable", "date", pool); TableSP table = createDemoTable(); appender.append(table); ConstantSP result = conn.run("select * from loadTable('dfs://SAMPLE_TRDDB', `demoTable)"); cout << result->getString() << endl;
若当前连接池不再使用,会自动被释放,但存在释放延时,可以通过调用 shutDown()
等待线程任务执行结束后立即释放连接。
pool.shutDown()
批量异步写入数据
针对单条数据批量写入的场景,DolphinDB C++ API 提供 MultithreadedTableWriter
(推荐使用), BatchTableWrite
(已停止维护,不推荐使用)类对象用于批量异步追加数据,并在客户端维护了一个数据缓冲队列。当服务器端忙于网络 I/O 时,客户端写线程仍然可以将数据持续写入缓冲队列(该队列由客户端维护)。写入队列后即可返回,从而避免了写线程的忙等。目前,BatchTableWrite
支持批量写入数据到内存表、分区表(已停止维护,不推荐使用);而 MultithreadedTableWriter
支持批量写入数据到内存表、分区表和维度表。
注意对于异步写入:
- API 客户端提交任务到缓冲队列,缓冲队列接到任务后,客户端即认为任务已完成。
- 提供
getStatus
等接口查看状态。
MultithreadedTableWriter(推荐使用)
MultithreadedTableWriter
是对 BatchTableWriter
的升级,它的默认功能和 BatchTableWriter
一致,但 MultithreadedTableWriter
支持多线程的并发写入。
MultithreadedTableWriter
对象及主要方法介绍如下:
MultithreadedTableWriter(const std::string& host, int port, const std::string& userId, const std::string& password, const string& dbPath, const string& tableName, bool useSSL, bool enableHighAvailability = false, const vector<string> *pHighAvailabilitySites = nullptr, int batchSize = 1, float throttle = 0.01f,int threadCount = 1, const string& partitionCol ="", const vector<COMPRESS_METHOD> *pCompressMethods = nullptr, Mode mode = M_Append, vector<string> *pModeOption = nullptr);
参数说明:
- host 字符串,表示所连接的服务器的地址
- port 整数,表示服务器端口。
- userId / password: 字符串,登录时的用户名和密码。
- dbPath 字符串,表示分布式数据库地址。内存表时该参数为空。请注意,1.30.17及以下版本 API,向内存表写入数据时,该参数需填写内存表表名。
- tableName 字符串,表示分布式表或内存表的表名。请注意,1.30.17及以下版本 API,向内存表写入数据时,该参数需为空。
- useSSL 布尔值,默认值为 False。表示是否启用加密通讯。
- enableHighAvailability 布尔值,默认为 False。若要开启 API 高可用,则需要指定 enableHighAvailability 参数为 True。
- pHighAvailabilitySites 列表类型,表示所有可用节点的 ip:port 构成的 list。
- batchSize 整数,表示批处理的消息的数量,默认值是 1,表示客户端写入数据后就立即发送给服务器。如果该参数大于 1,表示数据量达到 batchSize 时,客户端才会将数据发送给服务器。
- throttle 大于 0 的数,单位为秒。若客户端有数据写入,但数据量不足 batchSize,则等待 throttle的时间再发送数据。
- threadCount 整数,表示创建的工作线程数量,默认为 1,表示单线程。对于维度表,其值必须为1。
- partitionCol 字符串类型,默认为空,仅在 threadCount 大于1时起效。对于分区表,必须指定为分区字段名;如果是流数据表,必须指定为表的字段名;对于维度表,该参数不起效。
- pCompressMethods 列表类型,用于指定每一列采用的压缩传输方式,为空表示不压缩。每一列可选的压缩方式包括:
- COMPRESS_LZ4: LZ4 压缩
- COMPRESS_DELTA: DELTAOFDELTA 压缩
- mode 表示数据写入的方式,可选值为:M_Append 或 M_Upsert。M_Upsert 表示以
upsert!
方式追加或更新表数据;M_Append 表示以append!
方式追加表数据。 - modeOption 字符串数组,表示不同模式下的扩展选项,目前,仅当 mode 指定为 M_Upsert 时有效,表示由
upsert!
可选参数组成的字符串数组。
以下是 MultithreadedTableWriter
对象包含的函数方法介绍:
bool insert(ErrorCodeInfo &errorInfo, TArgs... args)
函数说明:
插入单行数据。返回一个bool类型,true表示插入成功,false表示失败。
参数说明:
- errorInfo:是 ErrorCodeInfo 类,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。当 errorCode 不为空时,表示 MTW 写入失败,此时,errorInfo 会显示失败的详细信息。之后的版本中会对错误信息进行详细说明,给出错误信息的代码、错误原因及解决办法。另外,ErrorCodeInfo 类提供了 hasError() 和 succeed() 方法用于获取数据插入的结果。hasError() 返回 true,则表示存在错误,否则表示无错误。succeed() 返回 true,则表示插入成功,否则表示插入失败。
- args:是变长参数,代表插入的一行数据。
void getUnwrittenData(std::vector<std::vector<ConstantSP>*> &unwrittenData);
函数说明:
返回一个嵌套列表,表示未写入服务器的数据。
注意:该方法获取到数据资源后, MultithreadedTableWriter
将释放这些数据资源。
参数说明:
- unwrittenData:嵌套列表,表示未写入服务器的数据,包含发送失败的数据以及待发送的数据两部分
bool insertUnwrittenData(std::vector<std::vector<ConstantSP>*> &records, ErrorCodeInfo &errorInfo)
函数说明:
将数据插入数据表。返回值同 insert 方法。与 insert 方法的区别在于,insert 只能插入单行数据,而 insertUnwrittenData 可以同时插入多行数据。
参数说明:
- records:需要再次写入的数据。可以通过方法 getUnwrittenData 获取该对象。
- errorInfo:是ErrorCodeInfo 类,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。当 errorCode 不为空时,表示 MTW 写入失败,此时,errorInfo 会显示失败的详细信息。之后的版本中会对错误信息进行详细说明,给出错误信息的代码、错误原因及解决办法。另外,ErrorCodeInfo 类提供了 hasError() 和 succeed() 方法用于获取数据插入的结果。hasError() 返回 true,则表示存在错误,否则表示无错误。succeed() 返回 true,则表示插入成功,否则表示插入失败。
void getStatus(Status &status);
函数说明:
获取 MultithreadedTableWriter
对象当前的运行状态。
参数说明:
- status:是MultithreadedTableWriter::Status 类,具有以下属性和方法
属性:
- isExiting:写入线程是否正在退出。
- errorCode:错误码。
- errorInfo:错误信息。
- sentRows:成功发送的总记录数。
- unsentRows:待发送的总记录数。
- sendFailedRows:发送失败的总记录数。
- threadStatus:写入线程状态列表。
- threadId:线程 Id。
- sentRows:该线程成功发送的记录数。
- unsentRows:该线程待发送的记录数。
- sendFailedRows:该线程发送失败的记录数。
方法:
- hasError():true 表示数据写入存在错误;false 表示数据写入无错误。
- succeed():true 表示数据写入成功;false 表示数据写入失败。
waitForThreadCompletion()
函数说明:
调用此方法后,MTW 会进入等待状态,待后台工作线程全部完成后退出等待状态。
MultithreadedTableWriter
常规处理流程如下:
//创建连接,并初始化测试环境 DBConnection conn; conn.connect("192.168.1.182", 8848, "admin", "123456"); conn.run("dbName = 'dfs://valuedb3'\ if(exists(dbName)){\ dropDatabase(dbName);\ }\ datetest=table(1000:0,`date`symbol`id,[DATE,SYMBOL,LONG]);\ db = database(directory=dbName, partitionType=HASH, partitionScheme=[INT, 10]);\ pt=db.createPartitionedTable(datetest,'pdatetest','id');"); //创建 MTW 对象前,先记录当前时间戳 mtwCreateTime conn.run("mtwCreateTime=now()"); vector<COMPRESS_METHOD> compress; compress.push_back(COMPRESS_LZ4); compress.push_back(COMPRESS_LZ4); compress.push_back(COMPRESS_DELTA); MultithreadedTableWriter writer("192.168.1.182", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, NULL, 10000, 1, 5, "id", &compress); thread t([&]() { try { ErrorCodeInfo errorInfo; //插入100行正确数据 (类型和列数都正确),MTW正常运行 for (int i = 0; i < 100; i++) { if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB", rand() % 10000) == false) { //此处不会执行到 cout << "insert failed: " << errorInfo.errorInfo << endl; break; } } //插入1行数据(类型不匹配),MTW 立刻发现待插入数据类型不匹配,立刻返回错误信息 if (writer.insert(errorInfo, rand() % 10000, 222, rand() % 10000) == false) { //数据错误,插入列数不匹配数据 cout << "insert failed: " << errorInfo.errorInfo << endl;// insert failed: Column counts don't match 2 //输出:insert failed: Cannot convert int to SYMBOL } //插入1行数据(列数不匹配),MTW 立刻发现待插入数据列数与待插入表的列数不匹配,立刻返回错误信息 if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB") == false) { cout << "insert failed: " << errorInfo.errorInfo << endl; //输出:insert failed: Column counts don't match 2 } //制造一次MTW内部无法处理的异常事件:删除在mtwCreateTime之后建立的连接 conn.run("id = exec sessionid from getSessionMemoryStat() where temporalAdd(gmtime(createTime), 16, 'h') > mtwCreateTime; for(closeid in id)closeSessions(closeid);"); Util::sleep(2000);//等待2秒,等待MTW检测到这个异常。此时 MTW 立刻终止所有工作线程,并修改状态为错误状态 //再插入1行正确数据,MTW 会因为工作线程终止而抛出异常,且不会写入该行数据 if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB", rand() % 10000) == false) { //这里不会执行 cout << "insert failed: " << errorInfo.errorInfo << endl; } //这里不会执行 cout << "Never run here."; } catch (exception &e) { //MTW 抛出异常 cerr << "MTW exit with exception: " << e.what() << endl; //输出:MTW exit with exception: Thread is exiting. } }); //检查目前MTW的状态 MultithreadedTableWriter::Status status; writer.getStatus(status); if (status.hasError()) { cout << "error in writing: " << status.errorInfo << endl; } //等待插入线程结束 t.join(); //等待MTW完全退出 writer.waitForThreadCompletion(); //再次检查完成后的MTW状态 writer.getStatus(status); if (status.hasError()) { cout << "error after write complete: " << status.errorInfo << endl; error after write complete: Failed to save the inserted data: Failed to read response header from the //获取未写入的数据 std::vector<std::vector<ConstantSP>*> unwrittenData; writer.getUnwrittenData(unwrittenData); cout << "unwriterdata length " << unwrittenData.size() << endl; unwriterdata length 100 if (!unwrittenData.empty()) { try { //重新写入这些数据,原有的MTW因为异常退出已经不能用了,需要创建新的MTW cout << "create new MTW and write again." << endl; MultithreadedTableWriter newWriter("192.168.1.182", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, NULL, 10000, 1, 2, "id", &compress); ErrorCodeInfo errorInfo; //插入未写入的数据 if (newWriter.insertUnwrittenData(unwrittenData, errorInfo)) { //等待写入完成后检查状态 newWriter.waitForThreadCompletion(); newWriter.getStatus(status); if (status.hasError()) { cout << "error in write again: " << status.errorInfo << endl; } } else { cout << "error in write again: " << errorInfo.errorInfo << endl; } } catch (exception &e) { cerr << "new MTW exit with exception: " << e.what() << endl; } } } //检查最后写入结果 cout << conn.run("select count(*) from pt")->getString() << endl; count ----- 100
调用 writer.insert() 方法向 writer 中写入数据,并通过 writer.getStatus() 获取 writer 的状态。 注意,使用 writer.waitForThreadCompletion() 方法等待 MTW 写入完毕,会终止 MTW 所有工作线程,保留最后一次写入信息。此时如果需要再次将数据写入 MTW,需要重新获取新的 MTW 对象,才能继续写入数据。
由上例可以看出,MTW 内部使用多线程完成数据转换和写入任务。但在 MTW 外部,API 客户端同样支持以多线程方式将数据写入 MTW,且保证了多线程安全。
BatchTableWriter(已停止维护,不推荐使用)
注意:目前已经停止维护 BatchTableWriter。推荐用户使用 MultithreadedTableWriter,详见 MultithreadedTableWriter 小节。
BatchTableWriter
对象及主要方法介绍如下:
BatchTableWriter(const std::string& hostName, int port, const std::string& userId, const std::string& password, bool acquireLock=true)
- hostName 连接服务器的IP地址。
- port 连接服务器的端口号。
- userId 是字符串,表示连接服务器的用户名。
- password 是字符串,表示连接服务器的密码。
- acquireLock 是布尔值,表示在使用过程中,API内部是否需要加锁。默认为true, 表示需要加锁。在并发调用API的场景下,建议加锁。
以下是BatchTableWriter对象包含的函数方法介绍:
addTable(const string& dbName, const string& tableName="", bool partitioned=true);
- dbName: 若为分布式表,需填写数据库名。若为内存表,填写表名。
- tableName: 需要写入的分布式表的表名。内存表时该值为空。
- partitioned: 表示添加的表是否为分区表。设置为true表示是分区表。如果添加的表是未分区表,必需设置partitioned为false.
注意:
- 如果添加的是内存表,需要share该表。
- 表名不可重复添加,需要先移除之前添加的表,否则会抛出异常。
insert(const string& dbName, const string& tableName, Fargs)
Fargs:是变长参数,代表插入的一行数据。写入的数据可以使用dolphindb的数据类型,也可以使用C++原生数据类型。数据类型和表中列的类型需要一一对应。数据类型对应关系见下表:
C++ 原生数据类型与 DolphinDB 数据类型对应关系表
DolphinDB类型 C++类型 BOOL char CHAR char SHORT short STRING const char* STRING string SYMBOL const char* SYMBOL string LONG long long NANOTIME long long NANOTIMESTAMP long long TIMESTAMP long long FLOAT float DOUBLE double DATE int MONTH int TIME int MINUTE int DATETIME int DATEHOUR int UUID unsigned char* IPADDR unsigned char* INT128 unsigned char* DECIMAL double
注意:
- 调用insert前需先调用addTable添加表,否则会抛出异常。
- 变长参数个数和数据类型需要与insert表的列数及类型匹配。
- 如果插入过程出现异常导致后台线程退出,再次调用insert会抛出异常,可以调用getUnwrittenData来获取之前所有写入缓冲队列但是没有成功写入服务器的数据(不包括本次insert的数据),然后再removeTable。如果需要再次插入数据,需要重新调用
addTable
. - 在移除该表的过程中调用本函数,仍然能够插入成功,但这些插入的数据并不会发送到服务器。移除该表的时候调用insert算是未定义行为,不建议这样写程序。
removeTable(const string& dbName, const string& tableName="")
释放由addTable添加的表所占用的资源。第一次调用该函数,该函数返回即表示后台线程已退出。
getUnwrittenData(const string& dbName, const string& tableName="")
获取还未写入的数据,主要是用于的时候获取写入出现错误时,剩下未写入的数据。该函数会取出剩下未写入的数据,这些数据将不会被继续写入,如若需要重新写入,需要再次调用插入函数。
getStatus(const string& dbName, const string& tableName="")
返回值是由一个整型和两个布尔型组合的元组,分别表示当前写入队列的深度、当前表是否被移除(true: 表示正在被移除),以及后台写入线程是否因为出错而退出。
getAllStatus()
获取所有当前存在的表的信息,不包含被移除的表。
返回值是一个表,共有六列,对应列的说明如下:
列名 | 详情 |
---|---|
DatabaseName | 数据库名称/内存表名称 |
TableName | 表名称/空字符串 |
WriteQueueDepth | 当前写入队列深度 |
SendedRows | 已成功发送到服务器的行数 |
Removing | 表是否正在被移除 |
Finished | 后台线程是否因为出错退出 |
示例:
#include "BatchTableWriter.h" using namespace dolphindb; using namespace std; int main(){ shared_ptr<BatchTableWriter> btw = make_shared<BatchTableWriter>(host, port, userId, password, true); btw->addTable("dfs://demoDB", "demoTable"); for(int i = 0; i < 1000; i+=3) btw->insert("dfs://demoDB", "demoTable", i,i+1,i+2); btw->removeTable("dfs://demoDB", "demoTable"); }
更多批量异步写入案例,请参考BatchTableWriterDemo.cpp。
C++ Streaming API
C++ API处理流数据的方式有三种:ThreadedClient, ThreadPooledClient 和 PollingClient。这三种实现方式的细节请见:
编译
Linux 64位
安装cmake:
sudo apt-get install cmake
编译:
mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=Release ../path_to_api-cplusplus/
make -j `nproc`
编译成功后,会生成三个可执行文件。
在Windows中使用MinGW编译
API
ThreadedClient
ThreadedClient 产生一个线程。每次新数据从流数据表发布时,该线程去获取和处理数据。
定义线程客户端
ThreadedClient::ThreadClient(int listeningPort);
- listeningPort 是单线程客户端的订阅端口号。
调用订阅函数
ThreadSP ThreadedClient::subscribe(string host, int port, MessageHandler handler, string tableName, string actionName = DEFAULT_ACTION_NAME, int64_t offset = -1, bool resub = true, VectorSP filter = nullptr, bool msgAsTable = false, bool allowExists = false, int batchSize = 1, double throttle = 1, string userName="", string password="", const StreamDeserializerSP blobDeserializer = nullptr);
- host是发布端节点的主机名。
- port是发布端节点的端口号。
- handler是用户自定义的回调函数,用于处理每次流入的消息。函数的参数是流入的消息,每条消息就是流数据表的一行。函数的结果必须是void。
- tableName是字符串,表示发布端上共享流数据表的名称。
- actionName是字符串,表示订阅任务的名称。它可以包含字母、数字和下划线。
- offset是整数,表示订阅任务开始后的第一条消息所在的位置。消息是流数据表中的行。如果没有指定offset,或它为负数或超过了流数据表的记录行数,订阅将会从流数据表的当前行开始。offset与流数据表创建时的第一行对应。如果某些行因为内存限制被删除,在决定订阅开始的位置时,这些行仍然考虑在内。
- resub是布尔值,表示订阅中断后,是否会自动重订阅。
- filter是一个向量,表示过滤条件。流数据表过滤列在filter中的数据才会发布到订阅端,不在filter中的数据不会发布。
- msgAsTable是布尔值。只有设置了 batchSize 参数,才会生效。设置为 True,订阅的数据会转换为 table。设置为 False,订阅的数据会转换成 vector。
- allowExists是布尔值。若设置为 True,则支持对同一个订阅流数据表使用多个 handler 处理数据。设置为 False,则不支持同一个订阅流数据表使用多个 handler 处理数据。
- batchSize 是一个整数,表示批处理的消息的数量。如果它是正数,直到消息的数量达到 batchSize 时,handler 才会处理进来的消息。如果它没有指定或者是非正数,消息到达之后,handler 就会马上处理消息。
- throttle是一个整数,表示 handler 处理到达的消息之前等待的时间,以秒为单位。默认值为 1。如果没有指定 batchSize,throttle 将不会起作用。
- userName是一个字符串,表示 API 所连接服务器的登录用户名。
- password是一个字符串,表示 API 所连接服务器的登录密码。
- blobDeserializer是订阅的异构流数据表对应的反序列化器。
ThreadSP 指向循环调用handler的线程的指针。该线程在此topic被取消订阅后会退出。
示例:
auto t = client.subscribe(host, port, [](Message msg) { // user-defined routine }, tableName); t->join();
取消订阅
void ThreadClient::unsubscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME);
host 是发布端节点的主机名。
port 是发布端节点的端口号。
tableName 是字符串,表示发布端上共享流数据表的名称。
actionName 是字符串,表示订阅任务的名称。它可以包含字母、数字和下划线。
该函数用于停止向发布者订阅数据。
ThreadPooledClient
ThreadPooledClient 产生用户指定数量的多个线程。每次新数据从流数据表发布时,这些线程同时去获取和处理数据。当数据到达速度超过单个线程所能处理的限度时,ThreadPooledClient 比 ThreadedClient 有优势。
定义多线程客户端
ThreadPooledClient::ThreadPooledClient(int listeningPort, int threadCount);
- listeningPort 是多线程客户端节点的订阅端口号。
- threadCount 是线程池的大小。
调用订阅函数
vector<ThreadSP> ThreadPooledClient::subscribe(string host, int port, MessageHandler handler, string tableName, string actionName = DEFAULT_ACTION_NAME, int64_t offset = -1, bool resub = true, VectorSP filter = nullptr);
返回一个指针向量,每个指针指向循环调用handler的线程。这些线程在此topic被取消订阅后会退出。
示例:
auto vec = client.subscribe(host, port, [](Message msg) { // user-defined routine }, tableName); for(auto& t : vec) { t->join(); }
取消订阅
void ThreadPooledClient::unsubscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME);
参数参见9.2.1.3节。
PollingClient
订阅数据时,会返回一个消息队列。用户可以从其中获取和处理数据。
定义客户端
PollingClient::PollingClient(int listeningPort);
- listeningPort 是客户端节点的订阅端口号。
订阅
MessageQueueSP PollingClient::subscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME, int64_t offset = -1);
该函数返回指向消息队列的指针。
示例:
auto queue = client.subscribe(host, port, handler, tableName);
Message msg;
while(true) {
if(queue->poll(msg, 1000)) {
if(msg.isNull()) break;
// handle msg
}
}
取消订阅
void PollingClient::unsubscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME);
注意,对于这种订阅模式,若返回一个空指针,说明已取消订阅。
异构流数据表反序列化器
DolphinDB server 自 1.30.17 及 2.00.5 版本开始,支持通过 replay 函数将多个结构不同的流数据表,回放(序列化)到一个流数据表里,这个流数据表被称为异构流数据表。C++ API 自 1.30.19 版本开始,新增 StreamDeserializer
类,用于构造异构流数据表反序列化器,以实现对异构流数据表的订阅和反序列化操作。
C++ API 支持通过两种方式构造异构流数据表反序列化器:
通过 key->schema 的映射表创建
StreamDeserializerSP StreamDeserializer(sym2schema)
通过 key->table 的映射表创建
StreamDeserializerSP StreamDeserializer(sym2schema, [conn])
- sym2table 是一个字典对象,其结构与 replay 回放到异构流数据表的输入表结构保持一致。
StreamDeserializer
将根据 sym2table 指定的结构对注入的数据进行反序列化。 - conn 是已经连接 DolphinDB server 的 DBConnection 对象。若不指定该参数,则 sym2table 中的指定的表必须是 dfs 表或者共享内存表。
订阅异构流数据表
订阅示例:
//假设异构流数据表回放时inputTables如下: //d = dict(['msg1', 'msg2'], [table1, table2]); \ //replay(inputTables = d, outputTables = `outTables, dateColumn = `timestampv, timeColumn = `timestampv)"; //异构流数据表解析器的创建方法如下: StreamDeserializerSP sdsp; {//使用key->schema的映射表创建 unordered_map<string, DictionarySP> sym2schema; DictionarySP t1s = conn.run("schema(table1)"); DictionarySP t2s = conn.run("schema(table2)"); sym2schema["msg1"] = t1s; sym2schema["msg2"] = t2s; sdsp = new StreamDeserializer(sym2schema); } {//使用key->table的映射表创建 unordered_map<string, pair<string, string>> sym2table; //因为是内存表,第一个参数dbpath指定为空,第二个参数指定为表名 sym2table["msg1"] = std::make_pair("", "table1"); sym2table["msg2"] = std::make_pair("", "table2"); //传入map和conn,&conn是可选参数,如果不传入,sym2table中的表必须是dfs表或者共享内存表。 sdsp = new StreamDeserializer(sym2table, &conn); } //sdsp可以作为订阅时的一个参数传入,譬如: //ThreadedClient threadedClient(listenport); //auto thread1 = threadedClient.subscribe(hostName, port, onehandler, "outTables", "mutiSchemaOne", 0, true, nullptr, false, false, "admin", "123456", sdsp);
订阅跨进程共享内存表
DolphinDB server 自2.00.7/1.30.19版本开始支持创建跨进程共享内存表。参考:createIPCInMemoryTable。
C++ API 提供订阅函数 subscribe
以实现订阅跨进程共享内存表的功能,来满足对订阅数据时延性要求较高的场景需求。通过订阅跨进程共享内存表,API 端可以直接通过共享内存获取由 server 端发布的流数据,极大地减少了网络传输的延时。因为进程间会访问同一个共享内存,所以要求发布端和订阅端必须位于同一台服务器。本节主要介绍如何通过 C++ API 提供的 IPCInMemoryStreamClient
类实现订阅共享内存表的功能。
示例:
server 端通过 GUI 创建一个跨进程内存表,并向该表实时写入数据
//创建流数据表 share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable; //创建跨进程共享内存表 share createIPCInMemoryTable(1000000, "pubTable", `timestamp`temperature, [TIMESTAMP, DOUBLE]) as shm_test; //自定义订阅处理函数 def shm_append(msg) { shm_test.append!(msg) } //订阅流数据表pubTable的数据写入跨进程内存表 topic2 = subscribeTable(tableName="pubTable", actionName="act3", offset=0, handler=shm_append, msgAsTable=true)
C++ API 端
string tableName = "pubTable"; //构造对象 IPCInMemoryStreamClient memTable; //创建一个存储数据的 table,要求和 createIPCInMemoryTable 中列的类型和名称一一对应 vector<string> colNames = {"timestamp", "temperature"}; vector<DATA_TYPE> colTypes = {DT_TIMESTAMP, DT_DOUBLE}; int rowNum = 0, indexCapacity=10000; TableSP outputTable = Util::createTable(colNames, colTypes, rowNum, indexCapacity); // 创建一个和共享内存表结构相同的表 //overwrite 是否覆盖前面旧的数据 bool overwrite = true; ThreadSP thread = memTable.subscribe(tableName, print, outputTable, overwrite); //传入 subscribe 中处理数据的回调函数 void print(TableSP table) { //处理收到的数据 } //最后取消订阅,结束回调 memTable.unsubscribe(tableName);
openssl 1.0.2版本源码安装
这部分主要是介绍下没有1.0.2版本openssl的,从源码编译安装的过程。已有的话忽略本节。
首先创建一个自定义目录,给自己编译的openssl使用。
这个示例里,我们使用/newssl目录,实际可以自己修改。
demo@ddb:~# mkdir /newssl
下载openssl源码
demo@ddb:~# wget https://www.openssl.org/source/old/1.0.2/openssl-1.0.2u.tar.gz
解压缩后,进入源码目录,配置编译结果存放目录为刚才创建的newssl目录
demo@ddb:~/openssl-1.0.2u# ./config shared --prefix=/newssl
编译安装
demo@ddb:~/openssl-1.0.2u# make install
因为这里编译结果存放的目录是/newssl,在实际编译时,链接的头文件和库文件目录也需要添加上我们存放的目录/newssl
比如前文编译小节的例子中,原先的g++编译命令是:
g++ main.cpp -std=c++11 -DLINUX -D_GLIBCXX_USE_CXX11_ABI=1 -DLOGGING_LEVEL_2 -O2 -I../include -lDolphinDBAPI -lpthread -lssl -L../bin/linux_x64/ABI1 -Wl,-rpath,.:../bin/linux_x64/ABI1 -o main
要使用我们自己编译的openssl库,需要改成
g++ main.cpp -std=c++11 -DLINUX -D_GLIBCXX_USE_CXX11_ABI=1 -DLOGGING_LEVEL_2 -O2 -I../include -lDolphinDBAPI -lpthread -L../bin/linux_x64/ABI1 -Wl,-rpath,.:../bin/linux_x64/ABI1 -L /newssl/lib/ -I /newssl/include -lssl -lcrypto -luuid -o main
本例中,编译文件后,直接运行main会报错,是由于我们的/newssl不在系统路径里,所以在运行main前,可以设置变量
export LD_LIBRARY_PATH=/newssl/lib
然后再运行./main就可以运行了
常见问题及解决办法
编译时出现报错 "undefined reference to 'uuid_generate'"。
原因: 未安装libuuid-devel库或者未添加
-luuid
参数。解决办法: 通过命令
yum install libuuid-devel
安装libuuid-devel库,或者在引用它的模块后添加-luuid
参数。