Plugin Development
DolphinDB database supports dynamic loading of external plugins to extend system functionality. The plugin can be written in C++, and it needs to be compiled into ".so" or ".dll" shared library files. This document focuses on how to develop a plugin and introduces the development process in various scenarios.
1. How to Develop a Plugin
1.1 Basic Concepts
In DolphinDB, plugins are designed to provide functions that can be called in scripts. Plugin functions can be divided into two types — operator functions and system functions. Operator functions accept no more than two parameters, whereas system functions accept any number of parameters and support accessing relevant information in the current session.
1.1.1 Operator Function
- Signature (C++ function):
ConstantSP (const ConstantSP& a, const ConstantSP& b)
- Parameters: If the function receives two parameters, a and b respectively represent the first and the second. If it receives one parameter, b serves as a placeholder with no actual use. If it receives none, both a and b serve as placeholders.
1.1.2 System Function
- Signature (C++ function):
ConstantSP (Heap* heap, vector<ConstantSP>& args)
- Parameters: The parameters passed by the user when calling the plugin function are sequentially stored in the C++ vector args. The heap parameter is internally managed and requires no user input.
In the function signature, ConstantSP represents a wide range of DolphinDB objects, including scalars, vectors, matrices, and tables. Other commonly used types derived from ConstantSP include VectorSP (for vectors) and TableSP (for tables).
1.2 Directory
The header files required for plugin development are commonly located in ./DolphinDBPlugin/tree/releaseXX/include/. This directory contains class declarations for DolphinDB’s core data structures and utility functions, serving as crucial tools for plugin implementation.
For example, the header files for DolphinDB server version 2.00.10 are available in DolphinDBPlugin/include.
The header files mentioned below (such as CoreConcept.h, ScalarImp.h, Logger.h, and Exceptions.h) can all be found in this directory.
1.3 Create Variables
To create a scalar, you can directly use the new
statement to create an object of a type declared in the header file ScalarImp.h and assign it to a ConstantSP. ConstantSP is an encapsulated smart pointer that automatically releases memory when there are no references to the variable, thereby saving the efforts for manually deleting the created variables.
ConstantSP i = new Int(1); // Equivalent to 1i
ConstantSP d = new Date(2019, 3, 14); // Equivalent to 2019.03.14
ConstantSP s = new String("DolphinDB"); // Equivalent to "DolphinDB"
ConstantSP voidConstant = new Void(); // Create a VOID type variable commonly used to represent an empty parameter
The header file Util.h declares a series of functions used to quickly create variables of various types and formats.
VectorSP v = Util::createVector(DT_INT, 10); // Create an INT type vector with an initial length of 10
v->setInt(0, 60); // Equivalent to v[0] = 60
VectorSP t = Util::createVector(DT_ANY, 0); // Create an ANY type vector (tuple) with an initial length of 0
t->append(new Int(3)); // Equivalent to t.append!(3)
t->get(0)->setInt(4); // Equivalent to t[0] = 4
// t->setInt(0, 4) cannot be used here because t is a tuple but setInt(0, 4) only works for INT type vectors
ConstantSP seq = Util::createIndexVector(5, 10); // Equivalent to 5..14
int seq0 = seq->getInt(0); // Equivalent to seq[0]
ConstantSP mat = Util::createDoubleMatrix(5, 10);// Create a DOUBLE type matrix with 5 columns and 10 rows
mat->setColumn(3, seq); // Equivalent to mat[3] = seq
1.4 Exception Handling and Parameter Validation
1.4.1 Exception Handling
During plugin development, exceptions are thrown and handled in the same way as in standard C++ development: throw exceptions with the throw
keyword and handle exceptions with the try block. Exception types are declared in the header file Exceptions.h. If a plugin function encounters a runtime error, a RuntimeException
is thrown.
During parameter validation, if the parameters do not meet the requirements, an IllegalArgumentException
is thrown. The following functions are commonly used for parameter validation:
ConstantSP->getType()
: Return the type of the variable (e.g., INT, CHAR, DATE). The available types are defined in the header file Types.h.ConstantSP->getCategory()
: Return the category of the variable. The available categories are defined in the header file Types.h. Common categories include:- INTEGRAL (e.g., INT, CHAR, SHORT, LONG)
- FLOATING (e.g., FLOAT, DOUBLE)
- TEMPORAL (e.g., TIME, DATE, DATETIME)
- LITERAL (e.g., STRING, SYMBOL)
ConstantSP->getForm()
: Return the form of the variable (e.g., scalar, vector, table). The available forms are defined in the header file Types.h.ConstantSP->isVector()
: Check if the variable is a vector.ConstantSP->isScalar()
: Check if the variable is a scalar.ConstantSP->isTable()
: Check if the variable is a table.ConstantSP->isNumber()
: Check if the variable is numeric.ConstantSP->isNull()
: Check if the variable is NULL.ConstantSP->getInt()
: Retrieve the INT values stored in the variable.ConstantSP->getString()
: Retrieve the STRING values stored in the variable.ConstantSP->size()
: Retrieve the length of the variable.
For more parameter validation functions, please refer to the “class Constant“ in the header file CoreConcept.h.
1.4.2 Parameter Validation Example
The following example develops a plugin function to calculate the factorial of non-negative integers, which returns a LONG type variable. In DolphinDB, the maximum value for LONG type data is 2^63-1, so the maximum factorial value it can represent is 25!. Therefore, only parameters ranging from 0 to 25 are valid for this function.
#include "CoreConcept.h"
#include "Exceptions.h"
#include "ScalarImp.h"
ConstantSP factorial(const ConstantSP &n, const ConstantSP &placeholder) {
string syntax = "Usage: factorial(n).";
if (!n->isScalar() || n->getCategory() != INTEGRAL)
throw IllegalArgumentException("factorial", syntax + "n must be an integral scalar.");
int nValue = n->getInt();
if (nValue < 0 || nValue> 25)
throw IllegalArgumentException("factorial", syntax + "n must be a non-negative integer less than 26.");
long long fact = 1;
for (int i = nValue; i> 0; i--)
fact *= i;
return new Long(fact);
}
1.5 Call DolphinDB’s Built-in Functions
In some cases, it is necessary to call DolphinDB's built-in functions for data processing. Some commonly used built-in functions have been predefined as methods in classes:
VectorSP v = Util::createIndexVector(1, 100);
ConstantSP avg = v->avg(); // Equivalent to avg(v)
ConstantSP sum2 = v->sum2(); // Equivalent to sum2(v)
v->sort(false); // Equivalent to sort(v, false)
Undefined built-in functions can only be called by system functions. You can retrieve a built-in function using heap->currentSession()->getFunctionDef
, and then call it with the call
method.
- To call an operator function, use
call(Heap, const ConstantSP&, const ConstantSP&)
. - To call a system function, use
call(Heap, vector<ConstantSP>&)
.
Below is an example of calling the built-in function cumsum
:
ConstantSP v = Util::createIndexVector(1, 100);
v->setTemporary(false); //Call setTemporary(false) first to prevent v from being modified during calling
FunctionDefSP cumsum = heap->currentSession()->getFunctionDef("cumsum");
ConstantSP result = cumsum->call(heap, v, new Void());
// Equivalent to cumsum(v). new Void() is a placeholder with no practical use.
2. Develop Plugin Functions for Time Series Processing
A distinctive strength of DolphinDB lies in its robust support for time series data. This chapter takes the msum
plugin function as an example to introduce how to develop a plugin function to handle time series data.
Time-series functions generally accept vectors as inputs and perform calculations on each element in the vector. In this example, the msum function accepts two parameters: a vector and a window. Its signature is as follows:
ConstantSP msum(const ConstantSP &X, const ConstantSP &window);
msum
returns a vector of the same length as the input vector. For simplicity, this example assumes that it returns a DOUBLE type vector. The Util::createVector
function can be used to pre-allocate space for the return value:
int size = X->size();
int windowSize = window->getInt();
ConstantSP result = Util::createVector(DT_DOUBLE, size);
When handling vectors, it is advisable to iterate over functions like getDoubleConst
and getIntConst
to obtain a batch of read-only data of a certain length and store the data in the buffer of the corresponding type for further calculations. This approach is more efficient than iterating over getDouble
and getInt
. For simplicity, this example uses the getDoubleConst
function to obtain data of length Util::BUF_SIZE
each time, which returns the pointer const double*
pointing to the head of the buffer.
double buf[Util::BUF_SIZE];
INDEX start = 0;
while (start < size) {
int len = std::min(Util::BUF_SIZE, size - start);
const double *p = X->getDoubleConst(start, len, buf);
for (int i = 0; i < len; i++) {
double val = p[i];
// ...
}
start += len;
}
In this example, msum
calculates the moving sum of X in a sliding window of length windowSize. A temporary variable tmpSum
tracks the sum of the current window. Each calculation of tmpSum
simply removes the element at position 0 of the window, and add a new observation. To write the summed values into result, iterate over result->getDoubleBuffer
to access a read/write buffer, and then use result->setDouble
to write the buffer back to the array. The setDouble
function checks if the address of the given buffer matches that of the underlying variable storage, avoiding unnecessary data duplication when they are the same. In most cases, the buffer accessed by getDoubleBuffer
is the variable’s actual storage, thus reducing data duplication and improving performance.
Note that DolphinDB uses the minimum DOUBLE value (defined as the macro DBL_NMIN
) to represent NULL values of DOUBLE type, so additional validation is required to identify NULL values.
The first “windowSize - 1” elements of the returned value are NULL. The processing of X can be divided into two distinct loops. The first loop sums the first “windowSize” elements, while the second loop handles the calculation of the sliding window. The final implementation is as follows:
ConstantSP msum(const ConstantSP &X, const ConstantSP &window) {
INDEX size = X->size();
int windowSize = window->getInt();
ConstantSP result = Util::createVector(DT_DOUBLE, size);
double buf[Util::BUF_SIZE];
double windowHeadBuf[Util::BUF_SIZE];
double resultBuf[Util::BUF_SIZE];
double tmpSum = 0.0;
INDEX start = 0;
while (start < windowSize) {
int len = std::min(Util::BUF_SIZE, windowSize - start);
const double *p = X->getDoubleConst(start, len, buf);
double *r = result->getDoubleBuffer(start, len, resultBuf);
for (int i = 0; i < len; i++) {
if (p[i] != DBL_NMIN) // p[i] is not NULL
tmpSum += p[i];
r[i] = DBL_NMIN;
}
result->setDouble(start, len, r);
start += len;
}
result->setDouble(windowSize - 1, tmpSum); // 上一个循环多设置了一个 NULL,填充为 tmpSum
while (start < size) {
int len = std::min(Util::BUF_SIZE, size - start);
const double *p = X->getDoubleConst(start, len, buf);
const double *q = X->getDoubleConst(start - windowSize, len, windowHeadBuf);
double *r = result->getDoubleBuffer(start, len, resultBuf);
for (int i = 0; i < len; i++) {
if (p[i] != DBL_NMIN)
tmpSum += p[i];
if (q[i] != DBL_NMIN)
tmpSum -= q[i];
r[i] = tmpSum;
}
result->setDouble(start, len, r);
start += len;
}
return result;
}
3. Develop Aggregate Functions for Distributed SQL Processing
In DolphinDB, SQL aggregate functions typically accept one or more vectors as parameters and return a scalar. When developing an aggregate function, it is crucial to understand how to access the elements within the vectors.
DolphinDB vectors can be stored in two ways: as a regular array using continuous memory or as a big array stored in many memory segments.
This chapter takes geometric mean calculation as an example to introduce how to develop an aggregate function with a focus on accessing the elements within the arrays.
3.1 Aggregate Function Example
The geometricMean
function accepts a vector as its parameter. To avoid overflow, the calculation is typically performed on the logarithms of the input values:
geometricMean([x1, x2, ..., xn])
= exp((log(x1) + log(x2) + log(x3) + ... + log(xn))/n)
To develop a distributed version of this function, you can first develop a plugin to implement the aggregate function logSum
, which calculates the logarithmic sum of data in a specific partition. Then, define a reduce function using the defg
keyword and a MapReduce function using the mapr
keyword.
When developing plugins in DolphinDB, the operations for regular arrays and big arrays are different. The isFastMode
function can be used to identify the array type:
ConstantSP logSum(const ConstantSP &x, const ConstantSP &placeholder) {
if (((VectorSP) x)->isFastMode()) {
// ...
}
else {
// ...
}
}
Regular arrays use continuous memory. The getDataArray
function can be used to get a pointer to the data. Assuming the data is stored as DOUBLE type:
if (((VectorSP) x)->isFastMode()) {
int size = x->size();
double *data = (double *) x->getDataArray();
double logSum = 0;
for (int i = 0; i < size; i++) {
if (data[i] != DBL_NMIN) // is not NULL
logSum += std::log(data[i]);
}
return new Double(logSum);
}
Big arrays are stored in many memory segments. The getSegmentSize
function can be used to get the size of each segment. The getDataSegment
function can be used to get the address of the first segment. This function returns a double pointer to an array of pointers, each pointing to the data array of each segment:
else {
int size = x->size();
int segmentSize = x->getSegmentSize();
double **segments = (double **) x->getDataSegment();
INDEX start = 0;
int segmentId = 0;
double logSum = 0;
while (start < size) {
double *block = segments[segmentId];
int blockSize = std::min(segmentSize, size - start);
for (int i = 0; i < blockSize; i++) {
if (block[i] != DBL_NMIN) // is not NULL
logSum += std::log(block[i]);
}
start += blockSize;
segmentId++;
}
return new Double(logSum);
}
The abovementioned scripts are tailored for the DOUBLE type data. However, arrays may be stored with different data types in practical development. In such cases, generic programming can be used, as shown in the example in Appendix.
3.2 Call Functions in DolphinDB
When developing an aggregate function in DolphinDB, developing both a non-distributed and a distributed version is typically necessary. The system will call the version with higher efficiency. The process is as follows.
Define the non-distributed version of the geometricMean
function in DolphinDB:
def geometricMean(x) {
return exp(logSum::logSum(x) \ count(x))
}
Define the geometricMeanMap
and geometricMeanReduce
functions, and then use the mapr
keyword to define the distributed version of geometricMean
:
def geometricMeanMap(x) {
return logSum::logSum(x)
}
defg geometricMeanReduce(myLogSum, myCount) {
return exp(sum(myLogSum) \ sum(myCount))
}
mapr geometricMean(x) { geometricMeanMap(x), count(x) -> geometricMeanReduce }
If the function is executed in a standalone system, you can simply load the plugin on the node where the computation takes place. However, if the data is located on remote nodes, it is imperitive to load the plugin on each remote node. This can be done manually by executing the loadPlugin
function on each node or by using the following scripts:
each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())
To validate the function, create a partitioned table with the following script:
db = database("", VALUE, 1 2 3 4)
t = table(take(1..4, 100) as id, rand(1.0, 100) as val)
t0 = db.createPartitionedTable(t, `tb, `id)
t0.append!(t)
select geometricMean(val) from t0 group by id;
3.3 Random Access to Big Arrays
In DolphinDB, big arrays can be accessed randomly through index calculations. Use the getSegmentSizeInBit
function to get the size of each segment in binary bits, and then use bitwise operations to get the offset across and within segments:
int segmentSizeInBit = x->getSegmentSizeInBit();
int segmentMask = (1 << segmentSizeInBit) - 1;
double **segments = (double **) x->getDataSegment();
int index = 3000000; // The index of the element to be accessed
double result = segments[index>> segmentSizeInBit][index & segmentMask];
// ^ offset across segments ^ offset within segments
The previous chapter introduces how to get read-only buffers using getDoubleConst
and getIntConst
, as well as how to get read/write buffers using getDoubleBuffer
and getIntBuffer
. These two methods of vector access are widely utilized in practical development.
Chapter 3 introduces how to directly access the underlying storage of vectors using getDataArray
and getDataSegment
. These methods are more suitable in special cases, such as when you know the data type and that the data is stored in big arrays.
4. Develop Plugin Functions with mr
/imr
Distributed Algorithms
In DolphinDB, the MapReduce model serves as a universal computing framework for executing distributed algorithms. The mr
and imr
functions enable users to implement distributed algorithms through scripting and are commonly used in the development of plugins for distributed algorithms. This chapter mainly introduces how to develop user-defined functions (e.g., map
and reduce
) in C++ and call the mr
and imr
functions to implement distributed computation.
4.1 Distributed Computing Example
This section uses mr
to develop a function that calculates the average of data in specified columns of a distributed table. The process of developing a DolphinDB distributed algorithm plugin and related technical details are also involved in this section.
In plugin development, user-defined functions, such as map
, reduce
, final
, and term
, can be either operator functions or system functions.
The map
function in this example performs calculations on all specified columns within each partition of the table. Each partition returns a tuple containing the sum of the data and the number of non-null elements.
ConstantSP columnAvgMap(Heap *heap, vector<ConstantSP> &args) {
TableSP table = args[0];
ConstantSP colNames = args[1];
double sum = 0.0;
int count = 0;
for (int i = 0; i < colNames->size(); i++) {
string colName = colNames->getString(i);
VectorSP col = table->getColumn(colName);
sum += col->sum()->getDouble();
count += col->count();
}
ConstantSP result = Util::createVector(DT_ANY, 2);
result->set(0, new Double(sum));
result->set(1, new Int(count));
return result;
}
The reduce
function in this example sums up the results from map
by calling DolphinDB’s built-in function add
, which can be accessed using heap->currentSession()->getFunctionDef("add")
:
FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add");
The final
function in this example divides the result from reduce
by the number of non-null elements to calculate the average value of each specified column in all partitions.
ConstantSP columnAvgFinal(const ConstantSP &result, const ConstantSP &placeholder) {
double sum = result->get(0)->getDouble();
int count = result->get(1)->getInt();
return new Double(sum / count);
}
After defining the map
, reduce
, and final
functions, export them as plugin functions (declared with extern "C"
in the header file and listed in the plugin loading text file). After exporting, you can use heap->currentSession->getFunctionDef
to access these functions and call the mr
function with them as parameters.
FunctionDefSP mapFunc = Heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap");
In this example, the map
function takes two parameters: table and colNames. However, the mr
function requires map
to accept a single parameter. To address this, you can wrap the map
function into a partial function using Util::createPartialFunction
before calling it.
vector<ConstantSP> mapWithColNamesArgs {new Void(), colNames};
FunctionDefSP mapWithColNames = Util::createPartialFunction(mapFunc, mapWithColNamesArgs);
Then, access DolphinDB’s built-in function mr
using heap->currentSession()->getFunctionDef("mr")
and call it with the mr->call
method, which is equivalent to calling the mr function in DolphinDB scripts.
The final implementation of the columnAvg
function is as follows:
ConstantSP columnAvg(Heap *heap, vector<ConstantSP> &args) {
ConstantSP ds = args[0];
ConstantSP colNames = args[1];
FunctionDefSP mapFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap");
vector<ConstantSP> mapWithColNamesArgs = {new Void(), colNames};
FunctionDefSP mapWithColNames = Util::createPartialFunction(mapFunc, mapWithColNamesArgs); // columnAvgMap{, colNames}
FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add");
FunctionDefSP finalFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgFinal");
FunctionDefSP mr = heap->currentSession()->getFunctionDef("mr"); // mr(ds, columnAvgMap{, colNames}, add, columnAvgFinal)
vector<ConstantSP> mrArgs = {ds, mapWithColNames, reduceFunc, finalFunc};
return mr->call(heap, mrArgs);
}
4.2 Call Functions in DolphinDB
If the function is executed in a standalone system, you can simply load the plugin on the node where the computation takes place. However, if the data is located on remote nodes, it is imperitive to load the plugin on each remote node. This can be done manually by executing the loadPlugin
function on each node or by using the following scripts:
each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())
Once the plugin is loaded, use the sqlDS
function to create a list of data sources and then call the columnAvg
function:
n = 100
db = database("dfs://testColumnAvg", VALUE, 1..4)
t = db.createPartitionedTable(table(10:0, `id`v1`v2, [INT,DOUBLE,DOUBLE]), `t, `id)
t.append!(table(take(1..4, n) as id, rand(10.0, n) as v1, rand(100.0, n) as v2))
ds = sqlDS(<select * from t>)
columnAvg::columnAvg(ds, `v1`v2)
5. Develop Plugin Functions for Streaming Data
In DolphinDB, the subscriber node of streaming data can process the received data using the function specified by the parameter handler. The subscribed data can be either a data table or a tuple, which is determined by the msgAsTable parameter of the subscribeTable
function. Typically, handler can be used to filter the streaming data, insert the data into another table, etc.
This chapter develops a handler that processes messages in the form of tuples and accepts two parameters:
- indices is an INT type scalar or vector indicating the indices of the elements in the tuple.
- table is a table into which the columns specified by indices of the tuple are inserted.
The interface for appending data to the table is bool append(vector<ConstantSP>& values, INDEX& insertedRows, string& errMsg)
. If the insertion succeeds, the system returns “true” and writes the number of inserted rows to insertedRows
. Otherwise, it returns “false” and writes the error message to errMsg
. The implementation of the plugin is as follows:
ConstantSP handler(Heap *heap, vector<ConstantSP> &args) {
ConstantSP indices = args[0];
TableSP table = args[1];
ConstantSP msg = args[2];
vector<ConstantSP> msgToAppend;
for (int i = 0; i < indices->size(); i++) {
int index = indices->getIndex(i)
msgToAppend.push_back(msg->get(index));
}
INDEX insertedRows;
string errMsg;
table->append(msgToAppend, insertedRows, errMsg);
return new Void();
}
To troubleshoot insertion errors, you can include the header file Logger.h and write the error messages to the log. Remember to add the macro definition -DLOGGING_LEVEL_2
when compiling the plugin:
bool success = table->append(msgToAppend, insertedRows, errMsg);
if (!success)
LOG_ERR("Failed to append to table:", errMsg);
The following script can be used to simulate the insertion of streaming data and validate the feasibility of handler:
loadPlugin("/path/to/PluginHandler.txt")
share streamTable(10:0, `id`sym`timestamp, [INT,SYMBOL,TIMESTAMP]) as t0
t1 = table(10:0, `sym`timestamp, [SYMBOL,TIMESTAMP])
subscribeTable(, `t0, , , handler::handler{[1,2], t1})
t0.append!(table(1..100 as id, take(`a`b`c`d, 100) as symbol, now() + 1..100 as timestamp))
select * from t1
6. Develop Plugin Functions for External Data Sources
When developing extensible interface plugins for third-party data sources, consider the following key aspects:
- Data source: A data source is a special type of data object that contains the meta descriptions of a data entity. By executing a data source, a materialized data entity, such as a table, matrix, vector, can be obtained. Data sources can be used to call distributed computing functions like
olsEx
andrandomForestClassifier
, ormr
,imr
, and the underlying computing framework defined in ComputingModel.h for parallel computing. For example, the functionsqlDS
creates a list of data sources according to the input SQL metacode. The development of third-party data interfaces generally involves a function that creates data sources. This function divides a large file into several partitions, each representing a subset of the data, and returns a tuple of the data source. A data source is generally represented by a code object and serves as a function call that takes metaprogramming as its parameter and returns a table. - Schema: The schema of a table defines the number of columns, and the name and data type of each column. The development of third-party data interfaces generally involves a function that can quickly retrieve the data schema, based on which users can modify the name and data type of each column.
- Input and Output (IO): In multi-core and multi-CPU environments, IO can become a bottleneck. DolphinDB provides IO interfaces like
DataInputStream
andDataOutputStream
, which encapsulate information related to data compression, endianness, and IO types (such as network, disk, and buffer) to facilitate development. Additionally, DolphinDB also implements multi-threaded IO —BlockFileInputStream
andBlockFileOutputStream
. This implementation boasts two benefits:- Parallel computation and IO operations: While one thread is processing data, background threads asynchronously prefetch the subsequent data needed by this thread.
- Avoidance of disk contention: As the number of threads increases, concurrent read/write operations to the same disk can drastically slow down the performance. This implementation serializes the read/write operations to the same disk, thereby improving throughput.
This chapter introduces the functions involved in the development of third-party data interfaces.
6.1 Data Format
Assuming that the data in this example is stored in a flat-file database, encoded in binary, and stored by rows. The storage starts at the file header. Each row contains four columns totaling 32 bytes:
- id: A signed 64-bit long integer (8 bytes).
- symbol: An ASCII C-string (8 bytes).
- date: A DATE type scalar in BCD format (8 bytes).
- value: A double-precision floating-point number according to the IEEE 754 standard (8 bytes).
For example:
id | symbol | date | value |
---|---|---|---|
5 | IBM | 20190313 | 10.1 |
The hexadecimal representation of this row is as follows:
0x 00 00 00 00 00 00 00 05
0x 49 42 4D 00 00 00 00 00
0x 02 00 01 09 00 03 01 03
0x 40 24 33 33 33 33 33 33
6.2 Function extractMyDataSchema
The extractMyDataSchema
function extracts the table schema of a data file. In this example, the schema is fixed and there is no need to read the file. A schema table is created using the Util::createTable
function:
ConstantSP extractMyDataSchema(const ConstantSP &placeholderA, const ConstantSP &placeholderB) {
ConstantSP colNames = Util::createVector(DT_STRING, 4);
ConstantSP colTypes = Util::createVector(DT_STRING, 4);
string names[] = {"id", "symbol", "date", "value"};
string types[] = {"LONG", "SYMBOL", "DATE", "DOUBLE"};
colNames->setString(0, 4, names);
colTypes->setString(0, 4, types);
vector<ConstantSP> schema = {colNames, colTypes};
vector<string> header = {"name", "type"};
return Util::createTable(header, schema);
}
In practical development, the schema can be obtained by reading the file header. Details on how to read files will be introduced later.
6.3 Function loadMyData
The loadMyData
function reads a file and outputs a DolphinDB table. With a specified file path, an input stream can be created through Util::createBlockFileInputStream
. Based on this stream, you can call the readBytes
function to read a specified length of bytes, the readBool
function to read the next boolean value, and the readInt
function to read the next integer. In this example, the syntax for the loadMyData
function is loadMyData(path, [start], [length])
.
- path is the file path.
- start is an integer indicating the row number where the reading starts.
- length is an integer indicating the total number of rows to be read.
The createBlockFileInputStream
function can set the byte number where the reading starts and the total number of bytes to be read based on its parameters.
ConstantSP loadMyData(Heap *heap, vector<ConstantSP> &args) {
ConstantSP path = args[0];
long long fileLength = Util::getFileLength(path->getString());
size_t bytesPerRow = 32;
int start = args.size()>= 2 ? args[1]->getInt() : 0;
int length = args.size()>= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start;
DataInputStreamSP inputStream = Util::createBlockFileInputStream(path->getString(), 0, fileLength, Util::BUF_SIZE, start * bytesPerRow, length * bytesPerRow);
char buf[Util::BUF_SIZE];
size_t actualLength;
while (true) {
inputStream->readBytes(buf, Util::BUF_SIZE, actualLength);
if (actualLength <= 0)
break;
// ...
}
}
When reading data, it is common to cache the data in an array and wait until the buffer is full before performing a batch insertion. The following example reads a binary file containing only CHAR type bytes, writes it into a DolphinDB vector vec
of CHAR type, and returns a table consisting of only the vec
column:
char buf[Util::BUF_SIZE];
VectorSP vec = Util::createVector(DT_CHAR, 0);
size_t actualLength;
while (true) {
inputStream->readBytes(buf, Util::BUF_SIZE, actualLength);
if (actualLength <= 0)
break;
vec->appendChar(buf, actualLength);
}
vector<ConstantSP> cols = {vec};
vector<string> colNames = {"col0"};
return Util::createTable(colNames, cols);
For the complete scripts of this section, please refer to the Appendix. In practical development, data loading functions may also accept a schema parameter to modify the types of data to be read as needed.
6.4 Function loadMyDataEx
The loadMyDataEx
function is an optimized version of loadMyData
to handle large data files without overwhelming the system's memory. It imports and saves data at the same time, converting a static binary file into a distributed DolphinDB table through a smooth data stream. Compared to loading the entire file into memory before saving, this method effectively reduces memory usage.
The syntax of loadMyDataEx
is loadMyDataEx(dbHandle, tableName, partitionColumns, path, [start],[length])
. Its parameters can refer to DolphinDB’s built-in function loadTextEx
. If the specified table exists in the database, the imported data will be appended to the table. If the table does not exist, the data will be added to a newly created table.
string dbPath = ((SystemHandleSP) dbHandle)->getDatabaseDir();
long long fileLength = Util::getFileLength(path->getString());
vector<ConstantSP> existsTableArgs = {new String(dbPath), tableName};
bool existsTable = heap->currentSession()->getFunctionDef("existsTable")->call(heap, existsTableArgs)->getBool(); // Equivalent to existsTable(dbPath, tableName)
ConstantSP result;
if (existsTable) { // If the specified table exists, load the table
vector<ConstantSP> loadTableArgs = {dbHandle, tableName};
result = heap->currentSession()->getFunctionDef("loadTable")->call(heap, loadTableArgs); // Equivalent to loadTable(dbHandle, tableName)
}
else { // If the specified table does not exist, create a table
TableSP schema = extractMyDataSchema(new Void(), new Void());
ConstantSP dummyTable = DBFileIO::createEmptyTableFromSchema(schema);
vector<ConstantSP> createTableArgs = {dbHandle, dummyTable, tableName, partitionColumns};
result = heap->currentSession()->getFunctionDef("createPartitionedTable")->call(heap, createTableArgs); // Equivalent to createPartitionedTable(db, dummyTable, tableName, partitionColumns)
}
The scripts for loading and appending data to a table adopts a pipeline framework. The initial tasks involve a series of loadMyData
function calls with different start parameters. The follower of the pipeline is a partial application append!
, which divides the data loading task into several partitions. The loadMyData
function loads the data partition by partition and the loaded data is inserted into the specified table using append!
. The core part of the scripts is as follows:
int sizePerPartition = 16 * 1024 * 1024;
int partitionNum = fileLength / sizePerPartition;
vector<DistributedCallSP> tasks;
FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false);
int partitionStart = start;
int partitionLength = length / partitionNum;
for (int i = 0; i < partitionNum; i++) {
if (i == partitionNum - 1)
partitionLength = length - partitionLength * i;
vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)};
ObjectSP call = Util::createRegularFunctionCall(func, partitionArgs); // Call loadMyData(path, partitionStart, partitionLength)
tasks.push_back(new DistributedCall(call, true));
partitionStart += partitionLength;
}
vector<ConstantSP> appendToResultArgs = {result};
FunctionDefSP appendToResult = Util::createPartialFunction(heap->currentSession()->getFunctionDef("append!"), appendToResultArgs); // Equivalent to append!{result}
vector<FunctionDefSP> functors = {appendToResult};
PipelineStageExecutor executor(functors, false);
executor.execute(heap, tasks);
For the complete scripts of this section, please refer to the Appendix. The use of the pipeline framework is one of the possible implementations for partitioned data import. Depending on the practical development scenarios, it is also feasible to use the StaticStageExecutor
declared in ComputingModel.h or the thread model Thread
declared in Concurrent.h.
6.5 Function myDataDS
The myDataDS
function returns a tuple of data sources. Each data source is a code object representing a function call, which can be generated using Util::createRegularFunctionCall
. You can execute the code object to access the corresponding data. The following example demonstrates how to generate a data source using the loadMyData
function:
ConstantSP myDataDS(Heap *heap, vector<ConstantSP> &args) {
ConstantSP path = args[0];
long long fileLength = Util::getFileLength(path->getString());
size_t bytesPerRow = 32;
int start = args.size()>= 2 ? args[1]->getInt() : 0;
int length = args.size()>= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start;
int sizePerPartition = 16 * 1024 * 1024;
int partitionNum = fileLength / sizePerPartition;
int partitionStart = start;
int partitionLength = length / partitionNum;
FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false);
ConstantSP dataSources = Util::createVector(DT_ANY, partitionNum);
for (int i = 0; i < partitionNum; i++) {
if (i == partitionNum - 1)
partitionLength = length - partitionLength * i;
vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)};
ObjectSP code = Util::createRegularFunctionCall(func, partitionArgs); // Call loadMyData(path, partitionStart, partitionLength)
dataSources->set(i, new DataSource(code));
}
return dataSources;
}
7. Create and Use SQL Statements in Plugin Scripts
In DolphinDB's scripting language, there are two primary functions designed for creating SQL statements: sql
and parseExpr
. Both functions can be used to create SQL statements in plugin scripts, offering greater flexibility for SQL usage. This chapter takes the parseExpr
function as an example to introduce how to use SQL statements in plugin scripts.
7.1 Procedure
7.1.1 Include the ScalarImp.h Header File
The parseExpr
function converts a string into metacode. During this process, the STRING type data is involved, which is a built-in scalar in DolphinDB. The STRING class is defined in the header file ScalarImp.h within the plugin library. Therefore, it is necessary to include this header file:
#include "ScalarImp.h"
7.1.2 Add the Table Object to Be Queried into a Heap
The first parameter of the function defined in the plugin is a heap object, which can be accessed when the plugin's interface is called through the script. The addItem
interface of the heap object can be used to add the table object to be queried to the heap for maintenance. This interface takes two parameters: the first is the name of the object, which is the string following the FROM
keyword in the SQL query statement, and the second is the table object.
The following example creates a table object t named “t“ using heap->addItem("t", t)
:
vector<string> colNames {"id", "x"};
vector<DATA_TYPE> colTypes {DT_SYMBOL, DT_DOUBLE};
TableSP t = Util::createTable(colNames, colTypes, 0, 8);
heap->addItem("t", t);
Then, the string "t" can be used to refer to the table t to be queried in a SQL statement:
select * from t;
7.1.3 Create the SQL String
Next, assign the result of select * from t
to a string named “sql”, where the string "t" is used to refer to the table t mentioned in the previous section:
string sql = "select * from t"
7.1.4 Execute the SQL Statement
Executing the SQL statement involves two steps:
- Parse the SQL statement into metacode using the
parseExpr
function. That is, access the function object throughheap->currentSession()->getFunctionDef("parseExpr")
in the plugin scripts. - Execute the metacode using the
eval
function. That is, organize the metacode into thevector<ConstantSP>
format for calling.
The execution scripts of the SQL statement are as follows:
string sql = "select * from t";
ConstantSP sqlArg = new String(DolphinString(sql));
vector<ConstantSP> args{sqlArg};
ObjectSP sqlObj = heap->currentSession()->getFunctionDef("parseExpr")->call(heap, args);
vector<ConstantSP> evalArgs{sqlObj};
ConstantSP ret = heap->currentSession()->getFunctionDef("eval")->call(heap, evalArgs);
7.2 Complete Scripts
7.2.1 Complete Scripts for select * from t
vector<string> colNames {"id", "x"};
vector<DATA_TYPE> colTypes {DT_SYMBOL, DT_DOUBLE};
TableSP t = Util::createTable(colNames, colTypes, 0, 8);
ConstantSP idData = Util::createVector(DT_SYMBOL, 0, 0);
ConstantSP xData = Util::createVector(DT_DOUBLE, 0, 0);
string testId("APPL");
double testX(100.1);
((Vector*)(idData.get()))->appendString(&testId, 1);
((Vector*)(xData.get()))->appendDouble(&testX, 1);
vector<ConstantSP> data = {idData, xData};
INDEX rows=0;
string errMsg;
t->append(data, rows, errMsg);
heap->addItem("t", t);
string strData = t->getString();
string sql = "select * from t";
ConstantSP sqlArg = new String(DolphinString(sql));
vector<ConstantSP> args{sqlArg};
ObjectSP sqlObj = heap->currentSession()->getFunctionDef("parseExpr")->call(heap, args);
vector<ConstantSP> evalArgs{sqlObj};
ConstantSP ret = heap->currentSession()->getFunctionDef("eval")->call(heap, evalArgs);
7.2.2 Complete Scripts for select avg(x) from t
vector<string> colNames {"id", "x"};
vector<DATA_TYPE> colTypes {DT_SYMBOL, DT_DOUBLE};
TableSP t = Util::createTable(colNames, colTypes, 0, 8);
ConstantSP idData = Util::createVector(DT_SYMBOL, 0, 0);
ConstantSP xData = Util::createVector(DT_DOUBLE, 0, 0);
string testId("APPL");
double testX(100.1);
((Vector*)(idData.get()))->appendString(&testId, 1);
((Vector*)(xData.get()))->appendDouble(&testX, 1);
vector<ConstantSP> data = {idData, xData};
INDEX rows=0;
string errMsg;
t->append(data, rows, errMsg);
heap->addItem("t", t);
string strData = t->getString();
string sql = "select avg(x) from t";
ConstantSP sqlArg = new String(DolphinString(sql));
vector<ConstantSP> args{sqlArg};
ObjectSP sqlObj = heap->currentSession()->getFunctionDef("parseExpr")->call(heap, args);
vector<ConstantSP> evalArgs{sqlObj};
ConstantSP ret = heap->currentSession()->getFunctionDef("eval")->call(heap, evalArgs);
8. FAQ
Q1. When loading plugins complied on Windows, how to solve the error The specified module could not be found
?
MinGW includes various compilers such as gcc and g++. When downloading compilers, make sure to choose the x86_64-posix-seh version (where “posix” indicates that the C++11 multithreading feature is enabled, and “seh” indicates zero-cost exception handling) to ensure compatibility with the DolphinDB server. If you install x86_64-posix-sjlj or other versions, some plugins may compile successfully but cannot be loaded, and an error occurs, indicating that "The specified module could not be found."
Q2. Which libraries and header files are needed for plugin development?
The scripts of DolphinDB plugins are stored in the dolphindb/DolphinDBPlugin repository on GitHub. The include directory contains class declarations for DolphinDB’s core data structures and utility functions, serving as crucial tools for plugin implementation. Thus, the header files from the include directory are needed during development.
For the linking process, make sure to include the library directory where libDolphinDB.dll or libDolphinDB.so is located and where the DolphinDB is installed.
Q3. What options are required for compilation?
When compiling plugins for Windows or Linux, add the "WINDOWS" or "LINUX" option. For branches release130 and above, add the "LOCKFREE_SYMBASE" option. Additionally, to ensure compatibility with older compiler versions, libDolphinDB.so was compiled with the _GLIBCXX_USE_CXX11_ABI=0
option, so ensure to include this option when compiling plugins. However, there is no need to add this option if libDolphinDB.so was compiled with the _GLIBCXX_USE_CXX11_ABI=1 option
.
Q4. How to solve linker errors with “std::__cxx11“ (undefined reference) during compilation?
Make sure to use the same version of gcc for the compilation of plugins and the DolphinDB server. For example, for standard Linux64 version, gcc 4.8.5 is recommended, whereas for the JIT version, gcc 6.2.0 is recommended.
Q5. How to load plugins? Can they be unloaded and reloaded?
Plugins can be loaded in the following two ways:
Use the
loadPlugin
Function: This function takes the path of a text file that describes the format of a DolphinDB plugin as its parameter.loadPlugin("/<YOUR_SERVER_PATH>/plugins/odbc/PluginODBC.txt");
Note: The first line of the text file specifies the name and path of the shared library file. By default, no path is needed, meaning the plugin library and the text file should be in the same directory.
Use the preloadModules Parameter: For DolphinDB Server 1.20.0 and above versions, plugins can be automatically preloaded by specifying the configuration parameter preloadModules. Ensure that the preloaded plugins exist; otherwise, the server will encounter errors at startup. Use commas to separate multiple plugins as follows:
preloadModules=plugins::mysql,plugins::odbc
Note: Loaded plugins cannot be unloaded. To reload a plugin, you must restart the node.
Q6. When executing a plugin function, how to solve the error Connnection refused:connect
or node crashes?
Ensure that the header files in the include directory match the libDolphinDB.so or libDolphinDB.dll. The plugin branch should be aligned with the version of the DolphinDB Server (e.g., use the release130 plugin branch for DolphinDB Server 1.30, and release200 for DolphinDB Server 2.00).
Make sure to use the same version of gcc for the compilation of plugins and libDolphinDB.so or libDolphinDB.dll to avoid ABI incompatibility issues between different compiler versions.
Since plugins run in the same process as the DolphinDB server, a plugin crash can lead to a system-wide crash. It is crucial to optimize the error detection mechanism: only the thread executing the plugin function can throw exceptions (DolphinDB server catches exceptions when calling plugin functions), and all other threads must catch exceptions, instead of throwing them.
Make sure to include the LOCKFREE_SYMBASE
macro in compilation options.
Q7. When executing plugin functions, how to solve the error Cannot recognize the token xxx
?
Before executing a plugin function,
- import the namespace of the plugin:
use demo;
- prefix the function with the module name:
demo::f1();
9. Appendix
For complete scripts of the above-mentioned examples, please refer to complete scripts.
Note: Please select the plugin branch consistent with the DolphinDB Server version. For example, the complete scripts for DolphinDB Server 2.00.11 are located in branch 2.00.11.