C# API
The C# API implements messaging and data conversion between .Net program and DolphinDB server, which runs on .Net Framework 4.0, .Net Core 3.1 and above. Since version 3.00.0.0, you can get the version number of the current API with Utils.getAPIVersion()
.
The C# API adopts interface-oriented programming. It uses the interface class "IEntity" to represent all data types returned by DolphinDB. Based on the "IEntity" interface class and DolphinDB data forms, the C# API provides the following extension interfaces which are included in the com.xxdb.data package:
Extended Interface Classes | Naming Rules | Examples |
---|---|---|
scalar | Basic<DataType> | BasicInt, BasicDouble, BasicDate, etc. |
vector, matrix | Basic<DataType><DataForm> | BasicIntVector, BasicDoubleMatrix, BasicAnyVector, etc. |
set, dictionary, and table | Basic<DataForm> | BasicSet, BasicDictionary, BasicTable. |
chart | BasicChart |
- Basic: basic data type interface
- <DataType>: DolphinDB data type
- <DataForm>: DolphinDB data form
The most important object provided by the DolphinDB C# API is DBConnection
. It provides the C# applications with the following methods:
Method Name | Details |
---|---|
DBConnection([asynchronousTask=false], [useSSL=false], [compress=false], [usePython=false]) | Construct an object, indicating whether to enable asynchronous tasks, ssl, and compression |
connect(hostName, port, [userId=""], [password=""], [startup=""], [highAvailability=false], [highAvailabilitySites], [reconnect=false]) | Connect the session to DolphinDB server |
login(userId, password, enableEncryption) | Log in to the server |
run(script, [listener], [priority=4], [parallelism=64], [fetchSize=0], [clearMemory=false]) | Run scripts on DolphinDB server synchronously |
runAsync(script, [priority = 4], [parallelism=64], [fetchSize=0], [clearMemory = false]) | Run scripts on DolphinDB server asynchronously |
run(functionName, arguments, [priority=4], [parallelism=64], [fetchSize=0], [clearMemory=false]) | Call a function on DolphinDB server synchronously |
runAsync(functionName, arguments, [priority=4], [parallelism=64], [fetchSize=0], [clearMemory=false]) | Call a function on DolphinDB server asynchronously |
upload(variableObjectMap) | Upload local data to DolphinDB server |
isBusy() | Determine if the current session is busy |
close() | Close the current session |
Note: If the current session is idle, C# API will automatically close the connection after a while. You can close the session by calling close()
to release the connection. Otherwise, other sessions may be unable to connect to the server due to too many connections.
Establish DolphinDB Connection
DBConnection
The C# API connects to the DolphinDB server via TCP/IP protocol. To connect to a local DolphinDB server with port number 8848:
using dolphindb;
using dolphindb.data;
using dolphindb.io;
public void Test_Connect(){
DBConnection conn=new DBConnection();
Assert.AreEqual(true,conn.connect("localhost",8848));
}
Starting from API 1.30.17, the following optional parameters can be specified for connection: asynchronousTask, useSSL, compress, and usePython. The default values of these parameters are false. Currently, only linux stable version >= 1.10.17, and latest version >= 1.20.6 are supported.
The following example establishes a connection to the server. It enables SSL and compression but disables asynchronous communication. Note that the configuration parameter enableHTTPS=true must be specified on the server side.
DBConnection conn = new DBConnection(false,true,true)
In the following example, the connection disables SSL and enables asynchronous communication. In this case, only DolphinDB scripts and functions can be executed and no values are returned. This feature is for asynchronous writes.
DBConnection conn = new DBConnection(true,false)
Establish a connection with a username and password:
boolean success = conn.connect("localhost", 8848, "admin", "123456");
To define and use user-defined functions in a C# program, you can pass in the user-defined scripts to the parameter startup. The advantages are: (1) These functions don't need to be defined repeatedly every time run is called; (2) The API client can automatically connect to the server after disconnection. If the parameter startup is specified, the C# API will automatically execute the script and register the functions. The parameter can be very useful for scenarios where the network is not stable but the program needs to run continuously.
boolean success = conn.connect("localhost", 8848, "admin", "123456", "");
ExclusiveDBConnectionPool
Multiple DBconnection
objects can be reused by ExclusiveDBConnectionPool
. You can either execute command ExclusiveDBConnectionPool.run
, or execute a task with execute
and then obtain the results with getResults
method of BasicDBTask
.
Method Name | Details |
---|---|
ExclusiveDBConnectionPoolExclusiveDBConnectionPool(host, port, uid, pwd, count, loadBalance, highAvaliability, [haSites], [startup=""], [compress= false], [useSSL=false], [usePython=false]) | Constructor. The parameter count indicates the number of connections to be used. If loadBalance is set to true, different nodes are connected. |
run(script, [priority=4], [parallelism=64], [clearMemory=false]) | Run scripts on DolphinDB server synchronously |
runAsync(script, [priority=4], [parallelism=64], [clearMemory=false]) | Run scripts on DolphinDB server asynchronously |
run(functionName, arguments, [priority=4], [parallelism=64], [clearMemory=false]) | Call a function on DolphinDB server synchronously |
runAsync(functionName, arguments, [priority=4], [parallelism=64], [clearMemory=false]) | Call a function on DolphinDB server asynchronously |
execute(task) | Execute the task. |
execute(tasks) | Execute tasks in batches. |
getConnectionCount() | Get the number of connections. |
shutdown | Shut down the connection pool. |
Note: If the current ExclusiveDBConnectionPool
is idle, C# API will automatically close the connection after a while. To release the connection resources, call shutdown()
upon the completion of thread tasks.
BasicDBTask
encapsulates the functions and arguments to be executed.
Method Name | Details |
---|---|
BasicDBTask(functionName, arguments, [priority=4], [parallelism=64], [clearMemory=false]) | functionName: the function to be executed; arguments: the arguments passed to the functionName. |
BasicDBTask(script, [priority=4], [parallelism=64], [clearMemory=false]) | The script to be executed. |
isSuccessful() | Check whether the task is executed successfully. |
getResults() | Get the execution results. |
getErrorMsg() | Get the error messages. |
Build a connection pool with 10 connections.
ExclusiveDBConnectionPool pool = new ExclusiveDBConnectionPool("192.168.1.38", 8902, "admin", "123456", 10, false, true);
//run the script
IEntity ret = pool.run("1 + 1");
Console.Out.WriteLine(ret.getString());
//run the specified function
ret = pool.run("abs", new List<IEntity> { new BasicInt(-3) });
Console.Out.WriteLine(ret.getString());
Create a task.
BasicDBTask task = new BasicDBTask("1..10");
pool.execute(task);
Check whether the task is executed successfully. If successful, returns the results; otherwise returns the error messages.
BasicIntVector data = null;
if (task.isSuccessful())
{
data = (BasicIntVector)task.getResults();
}
else
{
throw new Exception(task.getErrorMsg());
}
System.Console.Out.WriteLine(data.getString());
Output:
[1,2,3,4,5,6,7,8,9,10]
Create multiple tasks and call these tasks concurrently in ExclusiveDBConnectionPool.
List<IDBTask> tasks = new List<IDBTask>();
for (int i = 0; i < 10; ++i){
//call function log
tasks.Add(new BasicDBTask("log", new List<IEntity> { data.get(i) }));
}
pool.execute(tasks);
Check whether the task is executed successfully. If successful, returns the results; otherwise returns the error messages.
for (int i = 0; i < 10; ++i)
{
if (tasks[i].isSuccessful())
{
logData.append((IScalar)tasks[i].getResults());
}
else
{
throw new Exception(tasks[i].getErrorMsg());
}
}
System.Console.Out.WriteLine(logData.getString());
Output:
[0,0.693147,1.098612,1.386294,1.609438,1.791759,1.94591,2.079442,2.197225,2.302585]
Run DolphinDB Scripts
To run DolphinDB script in C#:
conn.run("script");
conn.runAsync("script")
run
indicates synchronous execution of the script. runAsync
indicates asynchronous execution.
If the script contains only one statement, such as an expression, DolphinDB returns the result of the statement. If the script contains more than one statements, the result of the last statement is returned. If the script contains an error or there is a network problem, an exception is thrown.
Call DolphinDB Functions
Other than running script, method run
can also execute DolphinDB built-in functions or user-defined functions on a remote DolphinDB server.
The following example passes a double vector to the server and calls function sum
.
public void testFunction(){
List<IEntity> args = new List<IEntity>(1);
BasicDoubleVector vec = new BasicDoubleVector(3);
vec.setDouble(0, 1.5);
vec.setDouble(1, 2.5);
vec.setDouble(2, 7);
args.Add(vec);
BasicDouble result = (BasicDouble)conn.run("sum", args);
Console.WriteLine(result.getValue());
}
Upload Data to DolphinDB Server
You can upload a data object to DolphinDB server and assign it to a variable for future use. You can specify the variable names with 3 types of characters: letters, numbers and underscores. The first character must be a letter.
public void testUpload(){
BasicTable tb = (BasicTable)conn.run("table(1..100 as id,take(`aaa,100) as name)");
Dictionary<string, IEntity> upObj = new Dictionary<string, IEntity>();
upObj.Add("table_uploaded", (IEntity)tb);
db.upload(upObj);
BasicIntVector v = (BasicIntVector)conn.run("table_uploaded.id");
Console.WriteLine(v.rows());
}
Read Data
This section introduces how to read data of different data forms in DolphinDB with the DBConnection
object.
Import the DolphinDB data type package:
using dolphindb.data;
- Vector
The following DolphinDB statement returns the C# object BasicStringVector
.
rand(`IBM`MSFT`GOOG`BIDU,10)
The rows
method returns the size of the vector. You can access vector elements by index with the getString
method.
public void testStringVector(){
IVector v = (BasicStringVector)conn.run("take(`IBM`MSFT`GOOG`BIDU, 10)");
Console.WriteLine(v.isVector());
Console.WriteLine(v.rows());
Console.WriteLine(((BasicString)v.get(1)).getValue());
}
Similarly, you can work with vectors or tuples of DOUBLE type.
public void testDoubleVector(){
IVector v = (BasicDoubleVector)conn.run("1.123 2.2234 3.4567");
Console.WriteLine(v.isVector());
Console.WriteLine(v.rows());
Console.WriteLine(Math.Round(((BasicDouble)v.get(1)).getValue(), 4));
}
public void testAnyVector(){
BasicAnyVector v = (BasicAnyVector)conn.run("[1 2 3,3.4 3.5 3.6]");
Console.WriteLine(v.rows());
Console.WriteLine(v.columns());
Console.WriteLine(((BasicDouble)((BasicDoubleVector)v.getEntity(1)).get(0)).getValue());
}
- Set
public void testSet(){
BasicSet s = (BasicSet)conn.run("set(1 3 5)");
Console.WriteLine(s.rows());
Console.WriteLine(s.columns());
}
- Matrix
To retrieve an element from a matrix, use get
. To get the number of rows and columns, use the functions rows
and columns
, respectively.
public void testIntMatrix(){
IMatrix m = (BasicIntMatrix)conn.run("matrix(45 47 48,56 65 67)");
Console.WriteLine(m.isMatrix());
Console.WriteLine(m.rows());
Console.WriteLine(m.columns());
Console.WriteLine(((BasicInt)m.get(0, 1)).getValue());
}
- Dictionary
The keys and values of a dictionary can be retrieved with functions keys
and values
, respectively. To get the value for a key, use get
.
public void testDictionary(){
BasicDictionary tb = (BasicDictionary)conn.run("dict(1 2 3 4,5 6 7 8)");
foreach (var key in tb.keys())
{
BasicInt val = (BasicInt)tb.get(key);
Console.WriteLine(val);
}
}
- Table
To get a column of a table, use getColumn
; To get a column name, use getColumnName
; To get the number of columns and rows of a table, use columns
and rows
, respectively.
public void testTable(){
BasicTable tb = (BasicTable)conn.run("table(1 as id,'a' as name)");
DataTable dt = tb.toDataTable();
Console.WriteLine(dt.Rows.Count);
}
- NULL object
To determine if an object is NULL, use getDataType
.
public void testVoid(){
IEntity obj = conn.run("NULL");
Assert.AreEqual(obj.getObject(), null);
}
Read From and Write to DolphinDB Tables
Read From and Write to DolphinDB Tables
This section introduces how to write data from other databases or third-party Web APIs to DolphinDB table using the C# API.
There are 2 types of DolphinDB tables:
- In-memory table: it has the fastest access speed, but if the node shuts down the data will be lost.
- DFS table: data are distributed across disks of multiple nodes.
Write to an In-Memory Table
DolphinDB offers several ways to write to an in-memory table:
- Insert a single row of data with
insert into
- Insert multiple rows of data in bulk with function
tableInsert
- Insert a table object with function
tableInsert
It is not recommended to save data with function append!
, as append!
returns the schema of a table and unnecessarily increases the network traffic.
The table in the following examples has 4 columns. Their data types are STRING, INT, TIMESTAMP and DOUBLE. The column names are cstring, cint, ctimestamp and cdouble, respectively.
t = table(10000:0,`cstring`cint`ctimestamp`cdouble,[STRING,INT,TIMESTAMP,DOUBLE])
share t as sharedTable
By default, an in-memory table is not shared among sessions. To access it in a different session, share it among sessions with share
.
Insert a Single Record with insert into
public void test_save_Insert(String str, int i, long ts, double dbl)
{
conn.run(String.Format("insert into sharedTable values('{0}',{1},{2},{3})",str,i,ts,dbl));
}
Insert Multiple Records in Bulk with tableInsert
Function tableInsert
can save records in batches. If data in C# can be organized as a List, it can be saved with function tableInsert
.
public void test_save_TableInsert(string[] strArray, int[] intArray, long[] tsArray, double[] dblArray)
{
//Constructing parameters with arrays
List<IEntity> args = new List<IEntity>() { new BasicStringVector(strArray), new BasicIntVector(intArray), new BasicTimestampVector(tsArray), new BasicDoubleVector(dblArray) };
conn.run("tableInsert{sharedTable}", args);
}
The example above uses partial application in DolphinDB to embed a table in tableInsert{sharedTable}
as a function. For details about partial application, please refer to Partial Application Documentation.
Save BasicTable Objects With Function tableInsert
Function tableInsert
can also accept a BasicTable
object in C# as a parameter to append data to a table in batches.
public void test_save_table(BasicTable table1)
{
List<IEntity> args = new List<IEntity>(){ table1};
conn.run("tableInsert{shareTable}", args);
}
Write to a DFS Table
DFS table is recommended in production environment. It supports snapshot isolation and ensures data consistency. With data replication, DFS tables offer fault tolerance and load balancing.
Save BasicTable Objects With Function tableInsert
Use the following script in DolphinDB to create a DFS table.
dbPath = 'dfs://testDatabase'
tbName = 'tb1'
if(existsDatabase(dbPath)){dropDatabase(dbPath)}
db = database(dbPath,RANGE,2018.01.01..2018.12.31)
db.createPartitionedTable(t,tbName,'ctimestamp')
DolphinDB provides function loadTable
to load DFS tables, and function tableInsert
to append data.
public void test_save_table(string dbPath, string tableName, BasicTable table1)
{
List<IEntity> args = new List<IEntity>() { table1 };
conn.run(String.Format("tableInsert{{loadTable('{0}','{1}')}}", dbPath,tableName), args);
}
In C#, you can easily create a BasicTable object using arrays or lists, which can then be appended to DFS tables. For example, the following 5 list objects(List<T>) boolArray, intArray, dblArray, dateArray and strArray are used to construct a BasicTable object:
List<String> colNames = new List<string>() { "cbool", "cint", "cdouble", "cdate", "cstring" };
List<IVector> cols = new List<IVector>() { new BasicBooleanVector(boolArray), new BasicIntVector(intArray), new BasicDoubleVector(dblArray), new BasicDateVector(dateArray), new BasicStringVector(strArray) };
BasicTable table1 = new BasicTable(colNames, cols);
Append to DFS Tables
DolphinDB DFS tables support concurrent reads and writes. This section introduces how to write data concurrently to DolphinDB DFS tables in C#.
Note that multiple writers are not allowed to write to one partition at the same time in DolphinDB. Therefore, make sure that each thread writes to a different partition separately when the client uses multiple writer threads. The user needs to first specify a connection pool, and the system obtains information about partitions before assigning the partitions to the connection pool for concurrent writes. A partition can only be written by one thread at a time.
DolphinDB C# API offers a convenient way to separate data by partition and write concurrently:
public PartitionedTableAppender(string dbUrl, string tableName, string partitionColName, string appendFunction, IDBConnectionPool pool)
Parameters:
- dbUrl: DFS database path
- tableName: DFS table name
- partitionColName: partitioning column
- appendFunction: (optional) a user-defined function.
tableInsert
is called by default. - pool: connection pool for concurrent writes
The following script first creates a DFS database "dfs://DolphinDBUUID" and a partitioned table "device_status". The database uses a COMPO domain of VALUE-HASH-HASH.
t = table(timestamp(1..10) as date,string(1..10) as sym)
db1=database(\"\",HASH,[DATETIME,10])
db2=database(\"\",HASH,[STRING,5])
if(existsDatabase(\"dfs://demohash\")){
dropDatabase(\"dfs://demohash\")
}
db =database(\"dfs://demohash\",COMPO,[db2,db1])
pt = db.createPartitionedTable(t,`pt,`sym`date)
With DolphinDB server version 1.30 or higher, you can write to DFS tables with the PartitionedTableAppender
object in C# API. For example:
IDBConnectionPool conn = new ExclusiveDBConnectionPool(host, port, "admin", "123456",threadCount, false, false);
PartitionedTableAppender appender = new PartitionedTableAppender(dbPath, tableName, "gid", "saveGridData{'" + dbPath + "','" + tableName + "'}", conn);
BasicTable table1 = createTable();
appender.append(table1);
Load and Query Tables
In the C# API, a table is saved as a BasicTable object. Since BasicTable is column-based, to retrieve rows, you need to get the necessary columns first and then get the rows.
In the example below, the BasicTable has 4 columns with data types STRING, INT, TIMESTAMP and DOUBLE. The column names are cstring, cint, ctimestamp and cdouble.
public void test_loop_basicTable(BasicTable table1)
{
BasicStringVector stringv = (BasicStringVector) table1.getColumn("cstring");
BasicIntVector intv = (BasicIntVector)table1.getColumn("cint");
BasicTimestampVector timestampv = (BasicTimestampVector)table1.getColumn("ctimestamp");
BasicDoubleVector doublev = (BasicDoubleVector)table1.getColumn("cdouble");
for(int ri=0; ri<table1.rows(); ri++){
Console.WriteLine(stringv.getString(ri));
Console.WriteLine(intv.getInt(ri));
DateTime timestamp = timestampv.getTimestamp(ri);
Console.WriteLine(timestamp);
Console.WriteLine(doublev.getDouble(ri));
}
}
Append Data Asynchronously
DolphinDB C# API provides MultithreadedTableWriter
that supports concurrent writes in multiple threads. You can use methods of MultithreadedTableWriter
class to asynchronously append data to a DolphinDB in-memory table, dimension table, or a DFS table.
The methods of MultithreadedTableWriter
object are introduced as follows:
MultithreadedTableWriter(string hostName, int port, string userId, string password,string dbName, string tableName, bool useSSL, bool enableHighAvailability = false, string[] pHighAvailabilitySites = null,int batchSize = 1, float throttle = 0.01f, int threadCount = 5, string partitionCol = "", int[] pCompressMethods = null, Mode mode = Mode.M_Append, string[] pModeOption = null, Callback callbackHandler = null);
Parameters:
- hostName: a string indicating host name
- port: an integer indicating port number
- userId / password: a string indicating username and password
- dbPath: a string indicating the DFS database path. Leave it unspecified for an in-memory table.
- tableName: a string indicating the DFS database path. Leave it unspecified for an in-memory table. Note: For API 1.30.17 or lower versions, when writing to an in-memory table, please specify the in-memory table name for dbPath and leave tableName empty.
- useSSL: a Boolean value indicating whether to enable SSL. The default value is false.
- enableHighAvailability: a Boolean value indicating whether to enable high availability. The default value is false.
- pHighAvailabilitySites: a list of ip:port of all available nodes
- batchSize: an integer indicating the number of messages in batch processing. The default value is 1, indicating the server processes the data as soon as they are written. If it is greater than 1, only when the number of data reaches batchSize, the client will send the data to the server.
- throttle: a positive floating-point number indicating the waiting time (in seconds) before the server processes the incoming data if the number of data written from the client does not reach batchSize.
- threadCount: an integer indicating the number of working threads to be created. The default value is 1, indicating single-threaded process. It must be 1 for a dimension table.
- partitionCol: a string indicating the partitioning column. It is None by default, and only takes effect when threadCount is greater than 1. For a partitioned table, it must be the partitioning column; for a stream table, it must be a column name; for a dimension table, the parameter does not take effect.
- pCompressMethods: an array of the compression methods used for each column. If unspecified, the columns are not compressed. The compression methods (case-insensitive) include: * "Vector_Fields.COMPRESS_LZ4": LZ4 algorithm * "Vector_Fields.COMPRESS_DELTA": Delta-of-delta encoding
- mode: indicates how data is written. It can be specified as Mode.M_Append or Mode.M_Upsert.
- Mode.M_Append: the data is appended by [tableInsert](https://www.dolphindb.com/help/FunctionsandCommands/FunctionReferences/t/tableInsert.html).
- Mode.M_Upsert: the data is updated by [upsert!](https://www.dolphindb.com/help/FunctionsandCommands/FunctionReferences/u/upsert%21.html).
- pModeOption: Currently, it takes effect only when mode is specified as M_Upsert. It is a STRING array consisting of the optional parameters of
upsert!
, indicating the expansion option of mode. For example, it can be specified asnew String[] { "ignoreNull=false", "keyColNames=`volume" }
. - callbackHandler: the Callback class. The default value is None, indicating that no callback is used. With callback enabled, the Callback class is inherited and method writeCompletion is reloaded. If parameter callbackHandler is specified, note that:
- The first parameter of insert must be of type STRING, indicating the id of the row.
- The getUnwrittenData method will not be available.
The following part introduces methods of MultithreadedTableWriter
object.
(1) insert
ErrorCodeInfo insert(params Object[] args)
Details:
Insert a single record. Return a class ErrorCodeInfo
containing errorCode and errorInfo. If errorCode is not "", MultithreadedTableWriter
has failed to insert the data, and errorInfo displays the error message.
The class ErrorCodeInfo
provides methods hasError()
and succeed()
to check whether the data is written properly. hasError()
returns true if an error occurred, otherwise false. succeed()
returns true if the data is written successfully, otherwise false.
Parameters:
- args: a variable-length argument (varargs) indicating the record to be inserted.
(2) getUnwrittenData
List<List<IEntity>> getUnwrittenData();
Details:
Return a nested list of data that has not been written to the server.
Note: Data obtained by this method will be released by MultithreadedTableWriter
.
(3) insertUnwrittenData
ErrorCodeInfo insertUnwrittenData(List<List<IEntity>> data);
Details:
Insert unwritten data. The result is in the same format as insert
. The difference is that insertUnwrittenData
can insert multiple records at a time.
Parameters:
- data: the data that has not been written to the server. You can obtain the object with method getUnwrittenData.
(4) getStatus
Status getStatus()
Details:
Get the current status of the MultithreadedTableWriter
object.
Parameters:
- status: the MultithreadedTableWriter.Status class with the following attributes and methods:
Attributes:
- isExiting: whether the threads are exiting
- errorCode: error code
- errorInfo: error message
- sentRows: number of sent rows
- unsentRows: number of rows to be sent
- sendFailedRows: number of rows failed to be sent
- threadStatus: a list of the thread status
- threadId: thread ID
- sentRows: number of rows sent by the thread
- unsentRows: number of rows to be sent by the thread
- sendFailedRows: number of rows failed to be sent by the thread
(5) waitForThreadCompletion
waitForThreadCompletion()
Details:
After calling the method, MultithreadedTableWriter
will wait until all working threads complete their tasks.
The methods of MultithreadedTableWriter
are usually used in the following way:
//build a connection and initialize the environment
string HOST = "192.168.1.38";
int PORT = 18848;
string USER = "admin";
string PASSWD = "123456";
DBConnection dBConnection = new DBConnection();
dBConnection.connect(HOST, PORT, USER, PASSWD);
Random random = new Random();
string script =
"dbName = 'dfs://valuedb3'" +
"if (exists(dbName))" +
"{" +
"dropDatabase(dbName);" +
"}" +
"datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" +
"db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" +
"pt=db.createPartitionedTable(datetest,'pdatetest','id');";
dBConnection.run(script);
ErrorCodeInfo ret;
MultithreadedTableWriter.Status writeStatus;
MultithreadedTableWriter writer = new MultithreadedTableWriter(HOST, PORT, USER, PASSWD, "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[] { Vector_Fields.COMPRESS_LZ4, Vector_Fields.COMPRESS_LZ4, Vector_Fields.COMPRESS_DELTA });
try
{
//insert 100 rows of records with correct data types and column count
for (int i = 0; i < 100; ++i)
{
ret = writer.insert(new DateTime(2022, 3, 23), "AAAAAAAB", i);
//this line will not be executed
if (ret.errorCode != "")
Console.WriteLine(string.Format("insert wrong format data: {0}\n", ret.ToString()));
}
Thread.Sleep(2000);
//An error message is returned at once if insert one row with wrong data type.
ret = writer.insert(new DateTime(2022, 3, 23), random.Next() % 10000, random.Next() % 10000);
if (ret.errorCode != "")
Console.WriteLine("insert wrong format data: {0}\n", ret.ToString());
/*
insert wrong format data: code = A1 info = Failed to insert data. Cannot convert int to DT_SYMBOL
*/
//If a disconnection occurs, MTW will fail the next time it writes data to the server.
//Write one row of data first to trigger an error.
ret = writer.insert(new DateTime(2022, 3, 23), "AAAAAAAB", 1);
Thread.Sleep(1000);
//Insert 9 more rows of correct data types, MTW raises an exception because the work thread terminates and the row will not be written to MTW.
//an exception will be thrown directly here
for (int i = 0; i < 9; ++i)
{
ret = writer.insert(new DateTime(2022, 3, 23), "AAAAAAAB", random.Next() % 10000);
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
//Thread is exiting.
}
writer.waitForThreadCompletion();
writeStatus = writer.getStatus();
if (writeStatus.errorCode != "")
//an error occurred when writing
Console.WriteLine(string.Format("error in writing:\n {0}", writeStatus.ToString()));
Console.WriteLine(((BasicLong)dBConnection.run("exec count(*) from pt")).getLong());
/*
error in writing: Cause of write failure
sentRows: 100
unsentRows: 3
sendFailedRows: 7
threadId : 3 sentRows : 20 unsentRows : 0 sendFailedRows : 5
threadId : 4 sentRows : 20 unsentRows : 2 sendFailedRows : 1
threadId : 5 sentRows : 20 unsentRows : 1 sendFailedRows : 0
threadId : 6 sentRows : 20 unsentRows : 0 sendFailedRows : 0
threadId : 7 sentRows : 20 unsentRows : 0 sendFailedRows : 1
100
*/
for (int i = 0; i < 30; ++i)
Console.Write('-');
Console.WriteLine();
List<List<IEntity>> unwriterdata = new List<List<IEntity>>();
if (writeStatus.sentRows != 110)
{
Console.WriteLine("error after write complete:" + writeStatus.errorInfo);
unwriterdata = writer.getUnwrittenData();
Console.WriteLine("unwriterdata {0}", unwriterdata.Count);
//To rewrite this data, a new MTW needs to be created because the original MTW is no longer available due to an exception.
MultithreadedTableWriter newWriter = new MultithreadedTableWriter(HOST, PORT, USER, PASSWD, "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[] { Vector_Fields.COMPRESS_LZ4, Vector_Fields.COMPRESS_LZ4, Vector_Fields.COMPRESS_DELTA });
try
{
//insert unwritten data
if (newWriter.insertUnwrittenData(unwriterdata).errorCode != "")
{
//wait until MTW finishes writing
newWriter.waitForThreadCompletion();
writeStatus = newWriter.getStatus();
if (writeStatus.errorCode != "")
{
System.Console.Out.WriteLine("error in write again:" + writeStatus.errorInfo);
}
}
}
finally
{
newWriter.waitForThreadCompletion();
writeStatus = newWriter.getStatus();
Console.WriteLine(string.Format("write again result:\n{0}", writeStatus));
}
}
else
Console.WriteLine("write complete : \n {0}", writeStatus.ToString());
//check the status
Console.WriteLine(((BasicLong)dBConnection.run("exec count(*) from pt")).getLong());
/*
unwriterdata 10
write again result:
isExiting: True
sentRows: 10
unsentRows: 0
sendFailedRows: 0
threadId : 8 sentRows : 5 unsentRows : 0 sendFailedRows : 0
threadId : 9 sentRows : 3 unsentRows : 0 sendFailedRows : 0
threadId : 10 sentRows : 1 unsentRows : 0 sendFailedRows : 0
threadId : 11 sentRows : 0 unsentRows : 0 sendFailedRows : 0
threadId : 12 sentRows : 1 unsentRows : 0 sendFailedRows : 0
110
*/
Enable the Callback class for MultithreadedTableWriter
With callback enabled, the Callback class is inherited and method writeCompletion
is reloaded to obtain the callback data.
A table of type BasicTable will be returned using the callback method, which consists of two columns:
- The first column (of String type) holds the id of each row added with
MultithreadedTableWriter.insert
. - The second column (of Boolean type) indicates whether each row was written successfully or not. True means it was written successfully; False means it failed.
Example:
public class CallbackHandler : Callback
{
public void writeCompletion(ITable callbackTable)
{
List<String> failedIdList = new List<string>();
BasicStringVector idVec = (BasicStringVector)callbackTable.getColumn(0);
BasicBooleanVector successVec = (BasicBooleanVector)callbackTable.getColumn(1);
for (int i = 0; i < successVec.rows(); i++)
{
if (!successVec.getBoolean(i))
{
failedIdList.Add(idVec.getString(i));
}
}
}
}
Example:
MultithreadedTableWriter mtw = new MultithreadedTableWriter(host, port, userName, password, dbName, tbName, useSSL,
enableHighAvailability, null, 10000, 1, 1, "price", null,MultithreadedTableWriter.Mode.M_Append,null, new CallbackHandler());
Use the insert
method of MultithreadedTableWriter
and write the id for each row in the first column.
String theme = "theme1";
for (int id = 0; id < 1000000; id++){
mtw.insert(theme + id, code, price); //theme+id is the id of each row, which will be returned in the callback
}
Update and Write to DolphinDB Tables
DolphinDB C# API provides AutoFitTableUpsert
class to update and write to DolphinDB tables. AutoFitTableUpsert
has the same functions as MultithreadedTableWriter
when its parameter mode is specified as Mode.M_Upsert. The difference is that AutoFitTableUpsert
is single-threaded and MultithreadedTableWriter
writes with multiple threads.
The methods of AutoFitTableUpsert
object are introduced as follows:
AutoFitTableUpsert(string dbUrl, string tableName, DBConnection connection, bool ignoreNull, string[] pkeyColNames, string[] psortColumns)
Parameters:
- dbUrl: a string indicating the path of the DFS database. It is specified as None for in-memory table.
- tableName: a STRING indicating the in-memory or DFS table name.
- connection: a DBConnection object to connect to the DolphinDB server and upsert data. Note that asynchronousTask must be false when creating the DBConnection object for
AutoFitTableUpsert
. - ignoreNull: a Boolean value. It is used to specify the parameter ignoreNull of upsert!, indicating whether to update the NULL values of the target table.
- pkeyColNames: a STRING array. It is used to specify the parameter keyColNames of
upsert!
, i.e., specifing the key columns. - psortColumnsa STRING array. It is used to specify the parameter sortColumns of
upsert!
. The updated partitions will be sorted on sortColumns (only within each partition, not across partitions).
The following introduces the upsert
method of AutoFitTableUpsert
object.
int upsert(BasicTable table)
Details:
Update a BasicTable object to the target table, and return an integer indicating the number of rows that have been updated.
Example:
DBConnection conn = new DBConnection(false, false, false);
conn.connect("192.168.1.116", 18999, "admin", "123456");
String dbName = "dfs://upsertTable";
String tableName = "pt";
String script = "dbName = \"dfs://upsertTable\"\n" +
"if(exists(dbName)){\n" +
"\tdropDatabase(dbName)\t\n" +
"}\n" +
"db = database(dbName, RANGE,1 10000,,'TSDB')\n" +
"t = table(1000:0, `id`value,[ INT, INT[]])\n" +
"pt = db.createPartitionedTable(t,`pt,`id,,`id)";
conn.run(script);
BasicIntVector v1 = new BasicIntVector(3);
v1.setInt(0, 1);
v1.setInt(1, 100);
v1.setInt(2, 9999);
BasicArrayVector ba = new BasicArrayVector(DATA_TYPE.DT_INT_ARRAY);
ba.append(v1);
ba.append(v1);
ba.append(v1);
List<String> colNames = new List<string>();
colNames.Add("id");
colNames.Add("value");
List<IVector> cols = new List<IVector>();
cols.Add(v1);
cols.Add(ba);
BasicTable bt = new BasicTable(colNames, cols);
String[] keyColName = new String[] { "id" };
AutoFitTableUpsert aftu = new AutoFitTableUpsert(dbName, tableName, conn, false, keyColName, null);
aftu.upsert(bt);
BasicTable res = (BasicTable)conn.run("select * from pt;");
System.Console.Out.WriteLine(res.getString());
Data Type Conversion
The C# API provides objects that correspond to DolphinDB data types. They are usually named as Basic+ <DataType>, such as BasicInt (which corresponds to DolphinDB INT), BasicDate (which corresponds to DolphinDB DATE), etc.
The following section describes the supported C# native types and their corresponding C# API and DolphinDB types regarding to the Scalar and Vector data forms.
Scalar
C# | C# API | DolphinDB | Examples of C# Types | C# API to DolphinDB | DolphinDB to C# |
---|---|---|---|---|---|
bool | BasicBool | BOOL | bool boolVar = true; | BasicBoolean basicBoolean = new BasicBoolean(boolVar); | use getValue method, e.g., basicBoolean.getValue() |
byte | BasicByte | CHAR | byte byteVar = 10; | BasicByte basicByte = new BasicByte(byteVar); | same as above |
short | BasicShort | SHORT | short shortVar = 10; | BasicShort basicShort = new BasicShort(shortVar); | |
int | BasicInt | INT | int intVar = 10; | BasicInt basicInt = new BasicInt(intVar); | |
long | BasicLong | LONG | long longVar = 10; | BasicLong basicLong = new BasicLong(longVar); | |
float | BasicFloat | FLOAT | float floatVar = 1.0f; | BasicFloat basicFloat = new BasicFloat(floatVar); | |
double | BasicDouble | DOUBLE | double doubleVar = 1.0d; | BasicDouble basicDouble = new BasicDouble(doubleVar); | |
Datetime | BasicNanoTimestamp | NANOTIMESTAMP | DateTime dateTimeVar = DateTime.Now; | BasicNanoTimestamp basicNanoTimestamp = new BasicNanoTimestamp(dateTimeVar); | |
same as above | BasicTimestamp | TIMESTAMP | same as above | BasicTimestamp basicTimestamp = new BasicTimestamp(dateTimeVar); | |
BasicDate | DATE | BasicDate basicDate = new BasicDate(dateTimeVar); | |||
BasicMonth | MONTH | BasicMonth basicMonth = new BasicMonth(dateTimeVar); | |||
BasicDateTime | DATETIME | BasicDateTime basicDateTime = new BasicDateTime(dateTimeVar); | |||
BasicDateHour | DATEHOUR | BasicDateHour basicDateHour = new BasicDateHour(dateTimeVar); | |||
TimeSpan | BasicNanoTime | NANOTIME | TimeSpan time = TimeSpan.FromTicks(1000); | BasicNanoTime basicNanoTime = new BasicNanoTime(timeSpan); | |
same as above | BasicTime | TIME | same as above | BasicTime basicTime = new BasicTime(timeSpan); | |
BasicSecond | SECOND | BasicSecond basicSecond = new BasicSecond(timeSpan); | |||
BasicMinute | MINUTE | BasicMinute basicMinute = new BasicMinute(timeSpan); | |||
string | BasicString | STRING | string var = "tag"; | BasicString basicString = new BasicString(stringVar); | |
same as above | BasicString(BLOB) | BLOB | same as above | BasicString basicBlob = new BasicString(stringVar, true); | |
BasicInt128 | INT128 | string int128String = "e1671797c52e15f763380b45e841ec32"; | BasicInt128 basicInt128 = BasicInt128.fromString(int128String); | string int128Data = basicInt128.getString(); | |
BasicDecimal32 | Decimal32 | string stringVal = "1.5555"; | BasicDecimal32 basicDecimal32 = new BasicDecimal32(stringVal , 4); | decimal decimalValue = basicDecimal32.getString(); | |
BasicDecimal64 | Decimal64 | same as above | BasicDecimal64 basicDecimal64 = new BasicDecimal64(stringVal , 4); | decimal decimalValue = basicDecimal64.getString(); | |
BasicDecimal128 | Decimal128 | BasicDecimal128 basicDecimal128 = new BasicDecimal128(stringVal , 4); | decimal decimalValue = basicDecimal128.getString(); | ||
decimal | BasicDecimal32 | Decimal32 | decimal decimalVal = 1.5555m; | BasicDecimal32 basicDecimal32 = new BasicDecimal32(decimalVal, 4); Note: For the method public BasicDecimal32(string data, int scale = -1) , if the scale parameter is not provided, the scale is set to match the number of decimal places in the input string. | decimal decimalValue = basicDecimal32.getDecimalValue(); |
same as above | BasicDecimal64 | Decimal64 | same as above | BasicDecimal64 basicDecimal64 = new BasicDecimal64(decimalVal, 4); Note: For the method public BasicDecimal364(string data, int scale = -1) , if the scale parameter is not provided, the scale is set to match the number of decimal places in the input string. | decimal decimalValue = basicDecimal64.getDecimalValue(); |
BasicDecimal128 | Decimal128 | BasicDecimal128 basicDecimal128 = new BasicDecimal128(decimalVal, 4); Note: For the method public BasicDecimal128(string data, int scale = -1) , if the scale parameter is not provided, the scale is set to match the number of decimal places in the input string. | decimal decimalValue = basicDecimal128.getDecimalValue(); | ||
Guid | BasicUuid | UUID | Guid uuid = Guid.NewGuid(); | BasicUuid basicUuid = BasicUuid.fromString(uuid.ToString()); | Guid uuidData = Guid.Parse(basicUuid.getString()); |
Vector
C# | C# API | DolphinDB | Constructors for C# Types | C# API to DolphinDB | DolphinDB to C# |
---|---|---|---|---|---|
bool | BasicBooleanVector | BOOL | void add(object value); void setBoolean(int index, bool value) | bool getBoolean(int index) | |
byte | BasicBooleanVector | BOOL | BasicBooleanVector(IList<byte?> list); BasicBooleanVector(byte[] array) | void add(object value) | |
byte | BasicByteVector | CHAR | BasicByteVector(IList<byte?> list); BasicByteVector(byte[] array) | void add(object value); void setByte(int index, byte value) | byte getByte(int index) |
short | BasicShortVector | SHORT | BasicShortVector(IList<short?> list); BasicShortVector(short[] array) | void add(object value); void setShort(int index, short value) | short getShort(int index) |
int | BasicIntVector | INT | BasicIntVector(IList<int?> list); BasicIntVector(int[] array) | void add(object value); void setInt(int index, int value) | int getInt(int index) |
long | BasicLongVector | LONG | BasicLongVector(IList<long?> list); BasicLongVector(long[] array) | void add(object value); void setLong(int index, long value) | long getLong(int index) |
float | BasicFloatVector | FLOAT | BasicFloatVector(IList<float?> list); BasicFloatVector(float[] array) | void add(object value); void setFloat(int index, float value) | float getFloat(int index) |
double | BasicDoubleVector | DOUBLE | BasicDoubleVector(IList<double?> list); BasicDoubleVector(double[] array) | void add(object value); void setDouble(int index, double value) | double getDouble(int index) |
Datetime | BasicNanoTimestamp | NANOTIMESTAMP | void add(object value); void setNanoTimestamp(int index, DateTime dt) | DateTime getNanoTimestamp(int index) | |
same as above | BasicTimestampVector | TIMESTAMP | void add(object value); void setNanoTimestamp(int index, DateTime dt) | DateTime getTimestamp(int index) | |
BasicDateVectorVector | DATE | void add(object value); void setDate(int index, DateTime date) | DateTime getDate(int index) | ||
BasicMonthVector | MONTH | void setMonth(int index, DateTime month) | DateTime getMonth(int index) | ||
BasicDateTimeVector | DATETIME | void add(object value); void setDateTime(int index, DateTime dt) | DateTime getDateTime(int index) | ||
BasicDateHourVector | DATEHOUR | void add(object value); void setDateTime(int index, DateTime dt) | DateTime getDateTime(int index) | ||
TimeSpan | BasicNanoTimeVector | NANOTIME | void add(object value); void setNanoTime(int index, TimeSpan time) | TimeSpan getNanoTime(int index) | |
same as above | BasicTimeVector | TIME | void add(object value); void setTime(int index, TimeSpan time) | TimeSpan getTime(int index) | |
BasicSecondVector | SECOND | void add(object value); void setSecond(int index, TimeSpan time) | TimeSpan getSecond(int index) | ||
BasicMinuteVector | MINUTE | void add(object value); void setMinute(int index, TimeSpan time) | TimeSpan getMinute(int index) | ||
string | BasicStringVector | STRING | void add(object value); void setString(int index, string value) | string getString(int index) | |
same as above | BasicStringVector(BLOB) | BLOB | void add(object value); void setString(int index, string value) | string getString(int index) | |
BasicDecimal32Vector | Decimal32 | BasicDecimal32Vector(string[] data, int scale); BasicDecimal32Vector(List<string> list, int scale) | void add(object value) | Get the IScalar object by the get method and call the getString method, e.g., basicDecimal32Vector.get(index).getString() | |
BasicDecimal64Vector | Decimal64 | BasicDecimal64Vector(string[] data, int scale); BasicDecimal64Vector(List<string> list, int scale) | void add(object value) | Get the IScalar object by the get method and call the getString method, e.g., basicDecimal64Vector.get(index).getString() | |
BasicDecimal128Vector | Decimal128 | BasicDecimal128Vector(string[] data, int scale); BasicDecimal128Vector(List<string> list, int scale) | void add(object value) | Get the IScalar object by the get method and call the getString method, e.g., basicDecimal128Vector.get(index).getString() | |
decimal | BasicDecimal32Vector | Decimal32 | BasicDecimal32Vector(decimal[] data, int scale); BasicDecimal32Vector(List<decimal> list, int scale) | void add(object value) | decimal getDecimal(int index) |
BasicDecimal64Vector | Decimal64 | BasicDecimal64Vector(decimal[] data, int scale); BasicDecimal64Vector(List<decimal> list, int scale) | void add(object value) | decimal getDecimal(int index) | |
BasicDecimal128Vector | Decimal128 | BasicDecimal128Vector(decimal[] data, int scale); BasicDecimal128Vector(List<decimal> list, int scale) | void add(object value) | decimal getDecimal(int index); Note: Before using getDecimal , it is important to check for NULLs in BasicDecimal128Vector. Use the isNull method to identify NULLs, as the NULL value in Decimal128 will exceed the range that a C# decimal can represent. |
C# Streaming API
A C# program can subscribe to streaming data via API. C# API can acquire streaming data in the following 3 ways: ThreadedClient, ThreadPooledClient, and PollingClient.
Interfaces
The corresponding interfaces of subscribe
are:
(1) Subscribe using ThreadedClient:
subscribe(string host, int port, string tableName, string actionName, MessageHandler handler, long offset, bool reconnect, IVector filter, int batchSize, float throttle = 0.01f, StreamDeserializer deserializer = null, string user = "", string password = "")
Parameters:
- host: the IP address of the publisher node.
- port: the port number of the publisher node.
- tableName: a string indicating the name of the publishing stream table.
- actionName: a string indicating the name of the subscription task.
- handler: a user-defined function to process the subscribed data.
- offset: an integer indicating the position of the first message where the subscription begins. A message is a row of the stream table. If offset is unspecified, negative or exceeding the number of rows in the stream table, the subscription starts with the next new message. offset is relative to the first row of the stream table when it is created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
- reconnect: a Boolean value indicating whether to resubscribe after network disconnection.
- filter: a vector indicating the filtering conditions. Only the rows with values of the filtering column in the vector specified by the parameter filter are published to the subscriber.
- batchSize: an integer indicating the number of unprocessed messages to trigger the handler. If it is positive, the handler does not process messages until the number of unprocessed messages reaches batchSize. If it is unspecified or non-positive, the handler processes incoming messages as soon as they come in.
- throttle: a floating-point number indicating the maximum waiting time (in seconds) before the handler processes the incoming messages. The default value is 1. This optional parameter has no effect if batchSize is not specified.
- deserializer: the deserializer for the subscribed heterogeneous stream table.
- user: a string indicating the username used to connect to the server.
- password: a string indicating the password used to connect to the server.
(2) Subscribe using ThreadPooledClient:
subscribe(string host, int port, string tableName, string actionName, MessageHandler handler, long offset, bool reconnect, IVector filter, StreamDeserializer deserializer = null, string user = "", string password = "")
(3) Subscribe using PollingClient:
subscribe(string host, int port, string tableName, string actionName, long offset, bool reconnect, IVector filter, StreamDeserializer deserializer = null, string user = "", string password = "")
Code Examples
The following examples introduce how to subscribe to stream table:
- The application on the client periodically checks if new data has been added to the streaming table. If yes, the application will acquire and consume the new data.
PollingClient client = new PollingClient(subscribeHost, subscribePort);
TopicPoller poller1 = client.subscribe(serverIP, serverPort, tableName, offset);
while (true)
{
List<IMessage> msgs = poller1.poll(1000);
if (msgs.Count > 0)
{
foreach(IMessage msg in msgs)
System.Console.Out.WriteLine(string.Format("receive: {0}, {1}, {2}", msg.getEntity(0).getString(), msg.getEntity(1).getString(), msg.getEntity(2).getString()));
}
/*
Successfully subscribed table 192.168.1.38:18848:local8848/Trades/csharpStreamingApi
receive: 1, 2022.05.26T10:39:22.105, 1.5
*/
}
- The API uses MessageHandler to get new data
First you need to define the message handler, which needs to implement dolphindb.streaming.MessageHandler interface.
public class MyHandler : MessageHandler
{
public void doEvent(IMessage msg)
{
System.Console.Out.WriteLine(string.Format("receive: {0}, {1}, {2}", msg.getEntity(0).getString(), msg.getEntity(1).getString(), msg.getEntity(2).getString()));
}
public void batchHandler(List<IMessage> msgs)
{
throw new NotImplementedException();
}
}
You can pass the handler instance into function subscribe
as a parameter with single-thread or multi-thread callbacks.
(1) ThreadedClient
ThreadedClient client = new ThreadedClient(subscribeHost, subscribePort);
client.subscribe(serverIP, serverPort, tableName, new MyHandler());
Thread.Sleep(10000);
//To cancel the subscription, you can use function close.
client.close();
(2) ThreadPooledClient: Handler mode client (multithreading)
ThreadPooledClient client = new ThreadPooledClient(subscribeHost, subscribePort);
client.subscribe(serverIP, serverPort, tableName, new MyHandler());
//To cancel the subscription, you can use function close.
Thread.Sleep(10000);
client.close();
Reconnect
Parameter reconnect is a Boolean value indicating whether to automatically resubscribe after the subscription experiences an unexpected interruption. The default value is false.
When reconnect=true:
- If the publisher and the subscriber both stay on but the network connection is interrupted, then after network is restored, the subscriber resumes subscription from where the network interruption occurs.
- If the publisher crashes, the subscriber will keep attempting to resume subscription after the publisher restarts.
- If persistence was enabled on the publisher, the publisher starts to read the persisted data on disk after restarting. Automatic resubscription would fail until the publisher has read the data for the time when the publisher crashed.
- If persistence was not enabled on the publisher, the automatic subscription will fail.
- If the subscriber crashes, the subscriber won't automatically resume the subscription after it restarts. In this case, we need to execute function
subscribe
again.
Parameter reconnect is set to be true for the following example:
PollingClient client = new PollingClient(subscribePort);
TopicPoller poller1 = client.subscribe(serverIP, serverPort, tableName, offset, true);
Filter
Parameter filter is a vector. It is used together with function setStreamTableFilterColumn
at the publisher node. Function setStreamTableFilterColumn
specifies the filtering column in the streaming table. Only the rows with filtering column values in filter are published.
In the following example, parameter filter is assigned an INT vector [1,2]:
BasicIntVector filter = new BasicIntVector(2);
filter.setInt(0, 1);
filter.setInt(1, 2);
PollingClient client = new PollingClient(subscribePort);
TopicPoller poller1 = client.subscribe(serverIP, serverPort, tableName, actionName, offset, filter);
Subscribe to a Heterogeneous Table
Since DolphinDB server version 1.30.17/2.00.5, the replay function supports replaying (serializing) multiple stream tables with different schemata into a single stream table (known as "heterogeneous stream table"). Starting from DolphinDB C# API version 1.30.19, a new class streamDeserializer
has been introduced for the subscription and deserialization of heterogeneous stream table.
Construct Deserializer for Heterogeneous Stream Table
You can construct a deserializer for heterogeneous table with streamDeserializer
.
(1) With specified table schema:
- specified schema
StreamDeserializer(Dictionary<string, BasicDictionary> filters)
- specified column types
StreamDeserializer(Dictionary<string, List<DATA_TYPE>> filters)
(2) With specified table:
StreamDeserializer(Dictionary<string, Tuple<string, string>> tableNames, DBConnection conn = null)
Code example:
//Supposing the inputTables to be replayed is:
//d = dict(['msg1', 'msg2'], [table1, table2]);
//replay(inputTables = d, outputTables = `outTables, dateColumn = `timestampv, timeColumn = `timestampv)";
//create a deserializer for heterogeneous table
{//specify schema
BasicDictionary outSharedTables1Schema = (BasicDictionary)conn.run("table1.schema()");
BasicDictionary outSharedTables2Schema = (BasicDictionary)conn.run("table2.schema()");
Dictionary<string, BasicDictionary> schemas = new Dictionary<string, BasicDictionary>();
schemas["msg1"] = outSharedTables1Schema;
schemas["msg2"] = outSharedTables2Schema;
StreamDeserializer streamFilter = new StreamDeserializer(schemas);
}
{//or specify column types
Dictionary<string, List<DATA_TYPE>> colTypes = new Dictionary<string, List<DATA_TYPE>>();
List<DATA_TYPE> table1ColTypes = new List<DATA_TYPE> { DATA_TYPE.DT_DATETIME, DATA_TYPE.DT_TIMESTAMP, DATA_TYPE.DT_SYMBOL, DATA_TYPE.DT_DOUBLE, DATA_TYPE.DT_DOUBLE };
colTypes["msg1"] = table1ColTypes;
List<DATA_TYPE> table2ColTypes = new List<DATA_TYPE> { DATA_TYPE.DT_DATETIME, DATA_TYPE.DT_TIMESTAMP, DATA_TYPE.DT_SYMBOL, DATA_TYPE.DT_DOUBLE };
colTypes["msg2"] = table2ColTypes;
StreamDeserializer streamFilter = new StreamDeserializer(colTypes);
}
{//specify tables
Dictionary<string, Tuple<string, string>> tables = new Dictionary<string, Tuple<string, string>>();
tables["msg1"] = new Tuple<string, string>("", "table1");
tables["msg2"] = new Tuple<string, string>("", "table2");
//conn is an optional parameter
StreamDeserializer streamFilter = new StreamDeserializer(tables, conn);
}
Subscribe to a Heterogeneous Table
(1) subscribe to a heterogeneous table using ThreadedClient:
- specify the parameter deserializer of function
subscribe
to deserialize the table when data is ingested:
ThreadedClient client = new ThreadedClient(listenport);
client.subscribe(hostName, port, tableName, actionName, handler, 0, true, null, -1, (float)0.01, streamFilter);
- add the streamFilter to user-defined Handler:
public class Handler6 : MessageHandler
{
private StreamDeserializer deserializer_;
private List<BasicMessage> msg1 = new List<BasicMessage>();
private List<BasicMessage> msg2 = new List<BasicMessage>();
public Handler6(StreamDeserializer deserializer)
{
deserializer_ = deserializer;
}
public void batchHandler(List<IMessage> msgs)
{
throw new NotImplementedException();
}
public void doEvent(IMessage msg)
{
try
{
BasicMessage message = deserializer_.parse(msg);
if (message.getSym() == "msg1")
{
msg1.Add(message);
}
else if (message.getSym() == "msg2")
{
msg2.Add(message);
}
}
catch (Exception e)
{
System.Console.Out.WriteLine(e.StackTrace);
}
}
public List<BasicMessage> getMsg1()
{
return msg1;
}
public List<BasicMessage> getMsg2()
{
return msg2;
}
};
Handler6 handler = new Handler6(streamFilter);
ThreadedClient client = new ThreadedClient(listenport);
client.subscribe(SERVER, PORT, tableName, actionName, handler, 0, true);
(2) subscribe to a heterogeneous table using ThreadPooledClient is similar as above:
- specify the parameter deserializer of function
subscribe
ThreadPooledClient client = new ThreadPooledClient(listenport);
client.subscribe(hostName, port, tableName, actionName, handler, 0, true, null, streamFilter);
- add the streamFilter to user-defined Handler:
Handler6 handler = new Handler6(streamFilter);
ThreadPooledClient client = new ThreadPooledClient(listenport);
client.subscribe(hostName, port, tableName, actionName, handler, 0, true);
(3) As PollingClient does not support callbacks, you can only pass the deserializer parameter to the function subscribe
:
PollingClient client = new PollingClient(listenport);
TopicPoller poller = client.subscribe(hostName, port, tableName, actionName, 0, true, null, streamFilter);
Unsubscribe
Each subscription is identified with a subscription topic. Subscription fails if a topic with the same name already exists. You can cancel the subscription with unsubscribe
.
client.unsubscribe(serverIP, serverPort, tableName,actionName);
Complex Event Processing
The DolphinDB 3.00.0 introduces a new feature - Complex Event Processing (CEP) - to find patterns in event data that enable detection of opportunities and threats. To better interact with server, C# API 3.00.1.0 provides EventSchema
, EventSender
and EventClient
classes for constructing, writing, and subscribing to events.
This section describes event-related operations, including:
- Defining events.
- Writing events to the heterogeneous stream table as a data source for the CEP engine on the DolphinDB server.
- Subscribing to events in the heterogeneous stream table output by the CEP engine on the DolphinDB server.
For more details, see the "Complex Event Processing" chapter of the DolphinDB server user manual.
Defining Events (EventSchema
)
EventSchema
can be used to define events on API client.
Constructor
public EventSchema(string eventType, List<string> fieldNames,
List<DATA_TYPE> fieldTypes, List<DATA_FORM> fieldForms,
List<int> fieldExtraParams = null)
Arguments
- eventType: A String indicating the event.
- fieldNames: A String indicating the field names.
- fieldTypes: Data types of fields.
- fieldForms: Data forms of fields.
- fieldExtraParams: A non-negative int used to specify the scale when the field type is DECIMAL.
Sending Events (EventSender
)
EventSender
can be used to write serialized event data into heterogeneous stream tables, which are subscribed by CEP engines in DolphinDB to capture and process the events.
Constructor
EventSender(DBConnection conn, string tableName, List<EventSchema> eventSchema, List<string> eventTimeFields = null, List<string> commonFields = null)
Arguments
- conn: A successfully connected DBConnection object, which should not be in asynchronous mode.
- tableName: A String indicating the name of the heterogeneous stream table to write into.
- eventSchema: All event schemas the CEP engine can process, i.e., all events to be sent. Note that the event schemas must match those specified when creating the CEP engine.
- eventTimeFields (optional): The time field for each event. It must be specified if the first column of the heterogeneous stream table is of temporal type.
- It is a String if the field name is the same for all events.
- It is a String list if field names vary across events. The order of strings must match that of the events specified in eventSchema.
- commonFields (optional): A String list indicating the fields with the same name across events. These fields can be stored as separate columns in the stream table for filtering during subscription.
sendEvent
Use the sendEvent
method to send events to DolphinDB server.
public void sendEvent(String eventType, List<Entity> attributes)
Arguments
- eventType: The event type.
- attributes: The values of each member in the event. Note that the order of the values must match the field order specified when defining the EventSchema, and their data types and forms must also match.
Subscribing to Events (EventClient
)
The API provides the EventClient
tool for subscribing to events in heterogeneous stream tables. These tables typically receive event data processed and serialized by the DolphinDB CEP engine.
Constructor
public EventClient(List<EventSchema> eventSchema, List<string> eventTimeFields = null, List<string> commonFields = null)
Arguments
- eventSchema: All event schemas the CEP engine can process, i.e., all events to be sent. Note that the event schemas must match those specified when creating the CEP engine.
- eventTimeFields (optional): The time field for each event. It must be specified if the first column of the heterogeneous stream table is of temporal type.
- It is a String if the field name is the same for all events.
- It is a String list if field names vary across events. The order of strings must match that of the events specified in eventSchema.
- commonFields (optional): A String list indicating the fields with the same name across events. These fields can be stored as separate columns in the stream table for filtering during subscription.
subscribe
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, String userName, String password)
Arguments
- host: IP address of the publisher node.
- port: Port number of the publisher node.
- tableName: Name of the publisher stream table.
- actionName (optional): The subscription task.
- handler: A user-defined function to process the subscribed data.
For the function called by handler, the first parameter indicates the event type, and the second parameter represents the event fields.
class MyEventMessageHandler : EventMessageHandler
{
public void doEvent(string eventType, List<IEntity> attribute)
{
// Handling logic
}
}
- offset (optional, default -1): An integer indicating the position of the first message where the subscription begins. A message is a row of the stream table. If offset is unspecified, negative or exceeding the number of rows in the stream table, the subscription starts with the next new message. offset is relative to the first row of the stream table when it is created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
- reconnect (optional, default false): Whether to enable reconnection.
- user (optional, default ""): Username used to connect to the server.
- password (optional, default ""): Password.
Canceling Subscription
public void unsubscribe(String host, int port, String tableName, String actionName)
Arguments
- host: IP address of the publisher node.
- port: Port number of the publisher node.
- tableName: Name of the publisher stream table.
- actionName: The subscription task.
Usage Example
This example introduces how to create events on the server, interfaces for serializing events, heterogeneous stream tables for writing and outputting events, and the CEP engine for processing events. Then use EventSender
on the API to send events to the heterogeneous stream table. After the CEP engine processes and outputs messages to another heterogeneous stream table through subscription, use EventClient
on the API side to subscribe to the output events.
The following script will create two heterogeneous stream tables "input" and "output", two streamEventSerializer
(serialization interfaces) objects, a CEP engine, and a "MarketData" event on the DolphinDB server.
class MarketData{
market :: STRING
code::STRING
price :: DOUBLE
qty :: INT
eventTime :: TIMESTAMP
def MarketData(m,c,p,q){
market = m
code = c
price = p
qty = q
eventTime = now()
}
}
class MainMonitor{
def MainMonitor(){
}
def updateMarketData(event)
def onload(){
addEventListener(updateMarketData,'MarketData',,'all')
}
def updateMarketData(event){
emitEvent(event)
}
}
dummy = table(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
share streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput
share streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as output
schema = table(1:0, `eventType`eventKeys`eventValuesTypeString`eventValueTypeID`eventValuesFormID, [STRING, STRING, STRING, INT[], INT[]])
insert into schema values("MarketData", "market,code,price,qty,eventTime", "STRING,STRING,DOUBLE,INT,TIMESTAMP", [18 18 16 4 12], [0 0 0 0 0])
inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schema, outputTable=intput, eventTimeField = "eventTime")
outputSerializer = streamEventSerializer(name=`serOutput, eventSchema=schema, outputTable=output, eventTimeField = "eventTime")
engine = createCEPEngine('cep1', <MainMonitor()>, dummy, [MarketData], 1, 'eventTime', 10000, outputSerializer)
subscribeTable(,`intput, `subopt, 0, getStreamEngine('cep1'),true)
Then write data to input via EventSender
on API.
// Define schema
List<string> fieldNames = new List<string> { "market", "code", "price", "qty", "eventTime" };
List<DATA_TYPE> fieldTypes = new List<DATA_TYPE> {
DATA_TYPE.DT_STRING, DATA_TYPE.DT_STRING, DATA_TYPE.DT_DOUBLE, DATA_TYPE.DT_INT, DATA_TYPE.DT_TIMESTAMP };
List<DATA_FORM> fieldForms = new List<DATA_FORM> {
DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR };
EventSchema schema = new EventSchema("MarketData", fieldNames, fieldTypes, fieldForms);
List<EventSchema> eventSchemas = new List<EventSchema> { schema };
List<string> eventTimeKeys = new List<string> { "eventTime" };
// Create EventSender
DBConnection conn = new DBConnection();
conn.connect(HOST, PORT, "admin", "123456");
EventSender sender = new EventSender(conn, "intput", eventSchemas, eventTimeKeys);
// Prepare data
List<IEntity> attributes = new List<IEntity> { new BasicString("sz"), new BasicString("s001") ,
new BasicDouble(9.8), new BasicInt(100), new BasicTimestamp(DateTime.Now)};
sender.sendEvent("MarketData", attributes);
The stream table input is subscribed by a CEP engine, which generates events to the output table.
Then use EventClient
to subscribe to the output table.
// user-defined function called by handler
class MyEventMessageHandler : EventMessageHandler
{
public void doEvent(string eventType, List<IEntity> attribute)
{// user-defined processing
System.Console.WriteLine("eventType: " + eventType);
foreach (IEntity entity in attribute)
{
System.Console.WriteLine(entity.getString());
}
System.Console.WriteLine();
}
}
// Define schema
List<string> fieldNames = new List<string> { "market", "code", "price", "qty", "eventTime" };
List<DATA_TYPE> fieldTypes = new List<DATA_TYPE> {
DATA_TYPE.DT_STRING, DATA_TYPE.DT_STRING, DATA_TYPE.DT_DOUBLE, DATA_TYPE.DT_INT, DATA_TYPE.DT_TIMESTAMP };
List<DATA_FORM> fieldForms = new List<DATA_FORM> {
DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR };
EventSchema schema = new EventSchema("MarketData", fieldNames, fieldTypes, fieldForms);
List<EventSchema> eventSchemas = new List<EventSchema> { schema };
List<string> eventTimeKeys = new List<string> { "eventTime" };
// create EventClient
EventClient client = new EventClient(eventSchemas, eventTimeKeys);
// create subscription
EventMessageHandler handler = new MyEventMessageHandler();
client.subscribe(HOST, PORT, "output", "CSharpClient", handler, 0, true, "admin", "123456");
Thread.Sleep(10000);
// cancel subscription
client.unsubscribe(HOST, PORT, "output", "CSharpClient");