C# API

The C# API implements messaging and data conversion between .Net program and DolphinDB server, which runs on .Net Framework 4.0, .Net Core 3.1 and above. Since version 3.00.0.0, you can get the version number of the current API with Utils.getAPIVersion().

The C# API adopts interface-oriented programming. It uses the interface class "IEntity" to represent all data types returned by DolphinDB. Based on the "IEntity" interface class and DolphinDB data forms, the C# API provides the following extension interfaces which are included in the com.xxdb.data package:

Extended Interface ClassesNaming RulesExamples
scalarBasic<DataType>BasicInt, BasicDouble, BasicDate, etc.
vector, matrixBasic<DataType><DataForm>BasicIntVector, BasicDoubleMatrix, BasicAnyVector, etc.
set, dictionary, and tableBasic<DataForm>BasicSet, BasicDictionary, BasicTable.
chart BasicChart
  • Basic: basic data type interface
  • <DataType>: DolphinDB data type
  • <DataForm>: DolphinDB data form

The most important object provided by the DolphinDB C# API is DBConnection. It provides the C# applications with the following methods:

Method NameDetails
DBConnection([asynchronousTask=false], [useSSL=false], [compress=false], [usePython=false])Construct an object, indicating whether to enable asynchronous tasks, ssl, and compression
connect(hostName, port, [userId=""], [password=""], [startup=""], [highAvailability=false], [highAvailabilitySites], [reconnect=false])Connect the session to DolphinDB server
login(userId, password, enableEncryption)Log in to the server
run(script, [listener], [priority=4], [parallelism=64], [fetchSize=0], [clearMemory=false])Run scripts on DolphinDB server synchronously
runAsync(script, [priority = 4], [parallelism=64], [fetchSize=0], [clearMemory = false])Run scripts on DolphinDB server asynchronously
run(functionName, arguments, [priority=4], [parallelism=64], [fetchSize=0], [clearMemory=false])Call a function on DolphinDB server synchronously
runAsync(functionName, arguments, [priority=4], [parallelism=64], [fetchSize=0], [clearMemory=false])Call a function on DolphinDB server asynchronously
upload(variableObjectMap)Upload local data to DolphinDB server
isBusy()Determine if the current session is busy
close()Close the current session

Note: If the current session is idle, C# API will automatically close the connection after a while. You can close the session by calling close() to release the connection. Otherwise, other sessions may be unable to connect to the server due to too many connections.

Establish DolphinDB Connection

DBConnection

The C# API connects to the DolphinDB server via TCP/IP protocol. To connect to a local DolphinDB server with port number 8848:

using dolphindb;
using dolphindb.data;
using dolphindb.io;

public void Test_Connect(){
      DBConnection conn=new DBConnection();
      Assert.AreEqual(true,conn.connect("localhost",8848));
}

Starting from API 1.30.17, the following optional parameters can be specified for connection: asynchronousTask, useSSL, compress, and usePython. The default values of these parameters are false. Currently, only linux stable version >= 1.10.17, and latest version >= 1.20.6 are supported.

The following example establishes a connection to the server. It enables SSL and compression but disables asynchronous communication. Note that the configuration parameter enableHTTPS=true must be specified on the server side.

DBConnection conn = new DBConnection(false,true,true)

In the following example, the connection disables SSL and enables asynchronous communication. In this case, only DolphinDB scripts and functions can be executed and no values are returned. This feature is for asynchronous writes.

DBConnection conn = new DBConnection(true,false)

Establish a connection with a username and password:

boolean success = conn.connect("localhost", 8848, "admin", "123456");

To define and use user-defined functions in a C# program, you can pass in the user-defined scripts to the parameter startup. The advantages are: (1) These functions don't need to be defined repeatedly every time run is called; (2) The API client can automatically connect to the server after disconnection. If the parameter startup is specified, the C# API will automatically execute the script and register the functions. The parameter can be very useful for scenarios where the network is not stable but the program needs to run continuously.

boolean success = conn.connect("localhost", 8848, "admin", "123456", "");

ExclusiveDBConnectionPool

Multiple DBconnection objects can be reused by ExclusiveDBConnectionPool. You can either execute command ExclusiveDBConnectionPool.run, or execute a task with execute and then obtain the results with getResults method of BasicDBTask.

Method NameDetails
ExclusiveDBConnectionPoolExclusiveDBConnectionPool(host, port, uid, pwd, count, loadBalance, highAvaliability, [haSites], [startup=""], [compress= false], [useSSL=false], [usePython=false])Constructor. The parameter count indicates the number of connections to be used. If loadBalance is set to true, different nodes are connected.
run(script, [priority=4], [parallelism=64], [clearMemory=false])Run scripts on DolphinDB server synchronously
runAsync(script, [priority=4], [parallelism=64], [clearMemory=false])Run scripts on DolphinDB server asynchronously
run(functionName, arguments, [priority=4], [parallelism=64], [clearMemory=false])Call a function on DolphinDB server synchronously
runAsync(functionName, arguments, [priority=4], [parallelism=64], [clearMemory=false])Call a function on DolphinDB server asynchronously
execute(task)Execute the task.
execute(tasks)Execute tasks in batches.
getConnectionCount()Get the number of connections.
shutdownShut down the connection pool.

Note: If the current ExclusiveDBConnectionPool is idle, C# API will automatically close the connection after a while. To release the connection resources, call shutdown() upon the completion of thread tasks.

BasicDBTask encapsulates the functions and arguments to be executed.

Method NameDetails
BasicDBTask(functionName, arguments, [priority=4], [parallelism=64], [clearMemory=false])functionName: the function to be executed; arguments: the arguments passed to the functionName.
BasicDBTask(script, [priority=4], [parallelism=64], [clearMemory=false])The script to be executed.
isSuccessful()Check whether the task is executed successfully.
getResults()Get the execution results.
getErrorMsg()Get the error messages.

Build a connection pool with 10 connections.

ExclusiveDBConnectionPool pool = new ExclusiveDBConnectionPool("192.168.1.38", 8902, "admin", "123456", 10, false, true);

//run the script
IEntity ret = pool.run("1 + 1");
Console.Out.WriteLine(ret.getString());

//run the specified function
ret = pool.run("abs", new List<IEntity> { new BasicInt(-3) });
Console.Out.WriteLine(ret.getString());

Create a task.

BasicDBTask task = new BasicDBTask("1..10");
pool.execute(task);

Check whether the task is executed successfully. If successful, returns the results; otherwise returns the error messages.

BasicIntVector data = null;
if (task.isSuccessful())
{
      data = (BasicIntVector)task.getResults();
}
else
{
      throw new Exception(task.getErrorMsg());
}
System.Console.Out.WriteLine(data.getString());

Output:

[1,2,3,4,5,6,7,8,9,10]

Create multiple tasks and call these tasks concurrently in ExclusiveDBConnectionPool.

List<IDBTask> tasks = new List<IDBTask>();
for (int i = 0; i < 10; ++i){
      //call function log
      tasks.Add(new BasicDBTask("log", new List<IEntity> { data.get(i) }));
}
pool.execute(tasks);

Check whether the task is executed successfully. If successful, returns the results; otherwise returns the error messages.

for (int i = 0; i < 10; ++i)
{
      if (tasks[i].isSuccessful())
      {
            logData.append((IScalar)tasks[i].getResults());
      }
      else
      {
            throw new Exception(tasks[i].getErrorMsg());
      }
}
System.Console.Out.WriteLine(logData.getString());

Output:

[0,0.693147,1.098612,1.386294,1.609438,1.791759,1.94591,2.079442,2.197225,2.302585]

Run DolphinDB Scripts

To run DolphinDB script in C#:

conn.run("script");
conn.runAsync("script")

run indicates synchronous execution of the script. runAsync indicates asynchronous execution.

If the script contains only one statement, such as an expression, DolphinDB returns the result of the statement. If the script contains more than one statements, the result of the last statement is returned. If the script contains an error or there is a network problem, an exception is thrown.

Call DolphinDB Functions

Other than running script, method run can also execute DolphinDB built-in functions or user-defined functions on a remote DolphinDB server.

The following example passes a double vector to the server and calls function sum.

public void testFunction(){
      List<IEntity> args = new List<IEntity>(1);
      BasicDoubleVector vec = new BasicDoubleVector(3);
      vec.setDouble(0, 1.5);
      vec.setDouble(1, 2.5);
      vec.setDouble(2, 7);            
      args.Add(vec);
      BasicDouble result = (BasicDouble)conn.run("sum", args);
      Console.WriteLine(result.getValue());
}

Upload Data to DolphinDB Server

You can upload a data object to DolphinDB server and assign it to a variable for future use. You can specify the variable names with 3 types of characters: letters, numbers and underscores. The first character must be a letter.

public void testUpload(){

      BasicTable tb = (BasicTable)conn.run("table(1..100 as id,take(`aaa,100) as name)");
      Dictionary<string, IEntity> upObj = new Dictionary<string, IEntity>();
      upObj.Add("table_uploaded", (IEntity)tb);
      db.upload(upObj);
      BasicIntVector v = (BasicIntVector)conn.run("table_uploaded.id");
      Console.WriteLine(v.rows());
}

Read Data

This section introduces how to read data of different data forms in DolphinDB with the DBConnection object.

Import the DolphinDB data type package:

using dolphindb.data;
  • Vector

The following DolphinDB statement returns the C# object BasicStringVector.

rand(`IBM`MSFT`GOOG`BIDU,10)

The rows method returns the size of the vector. You can access vector elements by index with the getString method.

public void testStringVector(){
      IVector v = (BasicStringVector)conn.run("take(`IBM`MSFT`GOOG`BIDU, 10)");
      Console.WriteLine(v.isVector());
      Console.WriteLine(v.rows());
      Console.WriteLine(((BasicString)v.get(1)).getValue());
}

Similarly, you can work with vectors or tuples of DOUBLE type.

public void testDoubleVector(){
      IVector v = (BasicDoubleVector)conn.run("1.123 2.2234 3.4567");
      Console.WriteLine(v.isVector());
      Console.WriteLine(v.rows());
      Console.WriteLine(Math.Round(((BasicDouble)v.get(1)).getValue(), 4));
}
public void testAnyVector(){
      BasicAnyVector v = (BasicAnyVector)conn.run("[1 2 3,3.4 3.5 3.6]");
      Console.WriteLine(v.rows());
      Console.WriteLine(v.columns());
      Console.WriteLine(((BasicDouble)((BasicDoubleVector)v.getEntity(1)).get(0)).getValue());
}
  • Set
public void testSet(){
      BasicSet s = (BasicSet)conn.run("set(1 3 5)");
      Console.WriteLine(s.rows());
      Console.WriteLine(s.columns());
}
  • Matrix

To retrieve an element from a matrix, use get. To get the number of rows and columns, use the functions rows and columns, respectively.

public void testIntMatrix(){
      IMatrix m = (BasicIntMatrix)conn.run("matrix(45 47 48,56 65 67)");
      Console.WriteLine(m.isMatrix());
      Console.WriteLine(m.rows());
      Console.WriteLine(m.columns());
      Console.WriteLine(((BasicInt)m.get(0, 1)).getValue());
}
  • Dictionary

The keys and values of a dictionary can be retrieved with functions keys and values, respectively. To get the value for a key, use get.

public void testDictionary(){
      BasicDictionary tb = (BasicDictionary)conn.run("dict(1 2 3 4,5 6 7 8)");
      foreach (var key in tb.keys())
      {
            BasicInt val = (BasicInt)tb.get(key);
            Console.WriteLine(val);
      }
}
  • Table

To get a column of a table, use getColumn; To get a column name, use getColumnName; To get the number of columns and rows of a table, use columns and rows, respectively.

public void testTable(){
	BasicTable tb = (BasicTable)conn.run("table(1 as id,'a' as name)");
	DataTable dt = tb.toDataTable();
	Console.WriteLine(dt.Rows.Count);
}
  • NULL object

To determine if an object is NULL, use getDataType.

public void testVoid(){
      IEntity obj = conn.run("NULL");
      Assert.AreEqual(obj.getObject(), null);
}

Read From and Write to DolphinDB Tables

Read From and Write to DolphinDB Tables

This section introduces how to write data from other databases or third-party Web APIs to DolphinDB table using the C# API.

There are 2 types of DolphinDB tables:

  • In-memory table: it has the fastest access speed, but if the node shuts down the data will be lost.
  • DFS table: data are distributed across disks of multiple nodes.

Write to an In-Memory Table

DolphinDB offers several ways to write to an in-memory table:

  • Insert a single row of data with insert into
  • Insert multiple rows of data in bulk with function tableInsert
  • Insert a table object with function tableInsert

It is not recommended to save data with function append!, as append! returns the schema of a table and unnecessarily increases the network traffic.

The table in the following examples has 4 columns. Their data types are STRING, INT, TIMESTAMP and DOUBLE. The column names are cstring, cint, ctimestamp and cdouble, respectively.

t = table(10000:0,`cstring`cint`ctimestamp`cdouble,[STRING,INT,TIMESTAMP,DOUBLE])
share t as sharedTable

By default, an in-memory table is not shared among sessions. To access it in a different session, share it among sessions with share.

Insert a Single Record with insert into

public void test_save_Insert(String str, int i, long ts, double dbl)
{
      conn.run(String.Format("insert into sharedTable values('{0}',{1},{2},{3})",str,i,ts,dbl));
}

Insert Multiple Records in Bulk with tableInsert

Function tableInsert can save records in batches. If data in C# can be organized as a List, it can be saved with function tableInsert.

public void test_save_TableInsert(string[] strArray, int[] intArray, long[] tsArray, double[] dblArray)
{
      //Constructing parameters with arrays
      List<IEntity> args = new List<IEntity>() { new BasicStringVector(strArray), new BasicIntVector(intArray), new BasicTimestampVector(tsArray), new BasicDoubleVector(dblArray) };
      conn.run("tableInsert{sharedTable}", args);
}

The example above uses partial application in DolphinDB to embed a table in tableInsert{sharedTable} as a function. For details about partial application, please refer to Partial Application Documentation.

Save BasicTable Objects With Function tableInsert

Function tableInsert can also accept a BasicTable object in C# as a parameter to append data to a table in batches.

public void test_save_table(BasicTable table1)
{
      List<IEntity> args = new  List<IEntity>(){ table1};
      conn.run("tableInsert{shareTable}", args);
}

Write to a DFS Table

DFS table is recommended in production environment. It supports snapshot isolation and ensures data consistency. With data replication, DFS tables offer fault tolerance and load balancing.

Save BasicTable Objects With Function tableInsert

Use the following script in DolphinDB to create a DFS table.

dbPath = 'dfs://testDatabase'
tbName = 'tb1'

if(existsDatabase(dbPath)){dropDatabase(dbPath)}
db = database(dbPath,RANGE,2018.01.01..2018.12.31)
db.createPartitionedTable(t,tbName,'ctimestamp')

DolphinDB provides function loadTable to load DFS tables, and function tableInsert to append data.

public void test_save_table(string dbPath, string tableName, BasicTable table1)
{
    List<IEntity> args = new List<IEntity>() { table1 };
    conn.run(String.Format("tableInsert{{loadTable('{0}','{1}')}}", dbPath,tableName), args);
}

In C#, you can easily create a BasicTable object using arrays or lists, which can then be appended to DFS tables. For example, the following 5 list objects(List<T>) boolArray, intArray, dblArray, dateArray and strArray are used to construct a BasicTable object:

List<String> colNames = new List<string>() { "cbool", "cint", "cdouble", "cdate", "cstring" };
List<IVector> cols = new List<IVector>() { new BasicBooleanVector(boolArray), new BasicIntVector(intArray), new BasicDoubleVector(dblArray), new BasicDateVector(dateArray), new BasicStringVector(strArray) };
BasicTable table1 = new BasicTable(colNames, cols);

Append to DFS Tables

DolphinDB DFS tables support concurrent reads and writes. This section introduces how to write data concurrently to DolphinDB DFS tables in C#.

Note that multiple writers are not allowed to write to one partition at the same time in DolphinDB. Therefore, make sure that each thread writes to a different partition separately when the client uses multiple writer threads. The user needs to first specify a connection pool, and the system obtains information about partitions before assigning the partitions to the connection pool for concurrent writes. A partition can only be written by one thread at a time.

DolphinDB C# API offers a convenient way to separate data by partition and write concurrently:

public PartitionedTableAppender(string dbUrl, string tableName, string partitionColName, string appendFunction, IDBConnectionPool pool)

Parameters:

  • dbUrl: DFS database path
  • tableName: DFS table name
  • partitionColName: partitioning column
  • appendFunction: (optional) a user-defined function. tableInsert is called by default.
  • pool: connection pool for concurrent writes

The following script first creates a DFS database "dfs://DolphinDBUUID" and a partitioned table "device_status". The database uses a COMPO domain of VALUE-HASH-HASH.

t = table(timestamp(1..10)  as date,string(1..10) as sym)
db1=database(\"\",HASH,[DATETIME,10])
db2=database(\"\",HASH,[STRING,5])
if(existsDatabase(\"dfs://demohash\")){
    dropDatabase(\"dfs://demohash\")
}
db =database(\"dfs://demohash\",COMPO,[db2,db1])
pt = db.createPartitionedTable(t,`pt,`sym`date)

With DolphinDB server version 1.30 or higher, you can write to DFS tables with the PartitionedTableAppender object in C# API. For example:

IDBConnectionPool conn = new ExclusiveDBConnectionPool(host, port, "admin", "123456",threadCount, false, false);

PartitionedTableAppender appender = new PartitionedTableAppender(dbPath, tableName, "gid", "saveGridData{'" + dbPath + "','" + tableName + "'}", conn);
BasicTable table1 = createTable();
appender.append(table1);            

Load and Query Tables

In the C# API, a table is saved as a BasicTable object. Since BasicTable is column-based, to retrieve rows, you need to get the necessary columns first and then get the rows.

In the example below, the BasicTable has 4 columns with data types STRING, INT, TIMESTAMP and DOUBLE. The column names are cstring, cint, ctimestamp and cdouble.

public void test_loop_basicTable(BasicTable table1)
{
      BasicStringVector stringv = (BasicStringVector) table1.getColumn("cstring");
      BasicIntVector intv = (BasicIntVector)table1.getColumn("cint");
      BasicTimestampVector timestampv = (BasicTimestampVector)table1.getColumn("ctimestamp");
      BasicDoubleVector doublev = (BasicDoubleVector)table1.getColumn("cdouble");
      for(int ri=0; ri<table1.rows(); ri++){
            Console.WriteLine(stringv.getString(ri));
            Console.WriteLine(intv.getInt(ri));
            DateTime timestamp = timestampv.getTimestamp(ri);
            Console.WriteLine(timestamp);
            Console.WriteLine(doublev.getDouble(ri));
      }
}

Append Data Asynchronously

DolphinDB C# API provides MultithreadedTableWriter that supports concurrent writes in multiple threads. You can use methods of MultithreadedTableWriter class to asynchronously append data to a DolphinDB in-memory table, dimension table, or a DFS table.

The methods of MultithreadedTableWriter object are introduced as follows:

MultithreadedTableWriter(string hostName, int port, string userId, string password,string dbName, string tableName, bool useSSL, bool enableHighAvailability = false, string[] pHighAvailabilitySites = null,int batchSize = 1, float throttle = 0.01f, int threadCount = 1, string partitionCol = "", int[] pCompressMethods = null, Mode mode = Mode.M_Append, string[] pModeOption = null, Callback callbackHandler = null);

Parameters:

  • hostName: a string indicating host name
  • port: an integer indicating port number
  • userId / password: a string indicating username and password
  • dbPath: a string indicating the DFS database path. Leave it unspecified for an in-memory table.
  • tableName: a string indicating the DFS database path. Leave it unspecified for an in-memory table. Note: For API 1.30.17 or lower versions, when writing to an in-memory table, please specify the in-memory table name for dbPath and leave tableName empty.
  • useSSL: a Boolean value indicating whether to enable SSL. The default value is false.
  • enableHighAvailability: a Boolean value indicating whether to enable high availability. The default value is false.
  • pHighAvailabilitySites: a list of ip:port of all available nodes
  • batchSize: an integer indicating the number of messages in batch processing. The default value is 1, indicating the server processes the data as soon as they are written. If it is greater than 1, only when the number of data reaches batchSize, the client will send the data to the server.
  • throttle: a positive floating-point number indicating the waiting time (in seconds) before the server processes the incoming data if the number of data written from the client does not reach batchSize.
  • threadCount: an integer indicating the number of working threads to be created. The default value is 1, indicating single-threaded process. It must be 1 for a dimension table.
  • partitionCol: a string indicating the partitioning column. It is None by default, and only takes effect when threadCount is greater than 1. For a partitioned table, it must be the partitioning column; for a stream table, it must be a column name; for a dimension table, the parameter does not take effect.
  • pCompressMethods: an array of the compression methods used for each column. If unspecified, the columns are not compressed. The compression methods (case-insensitive) include: * "Vector_Fields.COMPRESS_LZ4": LZ4 algorithm * "Vector_Fields.COMPRESS_DELTA": Delta-of-delta encoding
  • mode: indicates how data is written. It can be specified as Mode.M_Append or Mode.M_Upsert.
    • Mode.M_Append: the data is appended by [tableInsert](https://www.dolphindb.com/help/FunctionsandCommands/FunctionReferences/t/tableInsert.html).
    • Mode.M_Upsert: the data is updated by [upsert!](https://www.dolphindb.com/help/FunctionsandCommands/FunctionReferences/u/upsert%21.html).
  • pModeOption: Currently, it takes effect only when mode is specified as M_Upsert. It is a STRING array consisting of the optional parameters of upsert!, indicating the expansion option of mode. For example, it can be specified as new String[] { "ignoreNull=false", "keyColNames=`volume" }.
  • callbackHandler: the Callback class. The default value is None, indicating that no callback is used. With callback enabled, the Callback class is inherited and method writeCompletion is reloaded. If parameter callbackHandler is specified, note that:
  • The first parameter of insert must be of type STRING, indicating the id of the row.
  • The getUnwrittenData method will not be available.

The following part introduces methods of MultithreadedTableWriter object.

(1) insert

ErrorCodeInfo insert(params Object[] args)

Details:

Insert a single record. Return a class ErrorCodeInfo containing errorCode and errorInfo. If errorCode is not "", MultithreadedTableWriter has failed to insert the data, and errorInfo displays the error message.

The class ErrorCodeInfo provides methods hasError() and succeed() to check whether the data is written properly. hasError() returns true if an error occurred, otherwise false. succeed() returns true if the data is written successfully, otherwise false.

Parameters:

  • args: a variable-length argument (varargs) indicating the record to be inserted.

(2) getUnwrittenData

List<List<IEntity>> getUnwrittenData();

Details:

Return a nested list of data that has not been written to the server.

Note: Data obtained by this method will be released by MultithreadedTableWriter.

(3) insertUnwrittenData

ErrorCodeInfo insertUnwrittenData(List<List<IEntity>> data);

Details:

Insert unwritten data. The result is in the same format as insert. The difference is that insertUnwrittenData can insert multiple records at a time.

Parameters:

  • data: the data that has not been written to the server. You can obtain the object with method getUnwrittenData.

(4) getStatus

Status getStatus()

Details:

Get the current status of the MultithreadedTableWriter object.

Parameters:

  • status: the MultithreadedTableWriter.Status class with the following attributes and methods:

Attributes:

  • isExiting: whether the threads are exiting
  • errorCode: error code
  • errorInfo: error message
  • sentRows: number of sent rows
  • unsentRows: number of rows to be sent
  • sendFailedRows: number of rows failed to be sent
  • threadStatus: a list of the thread status
    • threadId: thread ID
    • sentRows: number of rows sent by the thread
    • unsentRows: number of rows to be sent by the thread
    • sendFailedRows: number of rows failed to be sent by the thread

(5) waitForThreadCompletion

waitForThreadCompletion()

Details:

After calling the method, MultithreadedTableWriter will wait until all working threads complete their tasks.

The methods of MultithreadedTableWriter are usually used in the following way:

//build a connection and initialize the environment
string HOST = "192.168.1.38";
int PORT = 18848;
string USER = "admin";
string PASSWD = "123456";
DBConnection dBConnection = new DBConnection();
dBConnection.connect(HOST, PORT, USER, PASSWD);
Random random = new Random();
string script =
"dbName = 'dfs://valuedb3'" +
"if (exists(dbName))" +
"{" +
      "dropDatabase(dbName);" +
"}" +
"datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" +
"db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" +
"pt=db.createPartitionedTable(datetest,'pdatetest','id');";
dBConnection.run(script);

ErrorCodeInfo ret;
MultithreadedTableWriter.Status writeStatus;
MultithreadedTableWriter writer = new MultithreadedTableWriter(HOST, PORT, USER, PASSWD, "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[] { Vector_Fields.COMPRESS_LZ4, Vector_Fields.COMPRESS_LZ4, Vector_Fields.COMPRESS_DELTA });
try
{
      //insert 100 rows of records with correct data types and column count
      for (int i = 0; i < 100; ++i)
      {
            ret = writer.insert(new DateTime(2022, 3, 23), "AAAAAAAB", i);
            //this line will not be executed
            if (ret.errorCode != "")
            Console.WriteLine(string.Format("insert wrong format data: {0}\n", ret.ToString()));
      }
      Thread.Sleep(2000);

      //An error message is returned at once if insert one row with wrong data type.
      ret = writer.insert(new DateTime(2022, 3, 23), random.Next() % 10000, random.Next() % 10000);
      if (ret.errorCode != "")
            Console.WriteLine("insert wrong format data: {0}\n", ret.ToString());
      /*
      insert wrong format data: code = A1 info = Failed to insert data. Cannot convert int to DT_SYMBOL
      */

      //If a disconnection occurs, MTW will fail the next time it writes data to the server.
      //Write one row of data first to trigger an error.
      ret = writer.insert(new DateTime(2022, 3, 23), "AAAAAAAB", 1);

      Thread.Sleep(1000);

      //Insert 9 more rows of correct data types, MTW raises an exception because the work thread terminates and the row will not be written to MTW.
      //an exception will be thrown directly here
      for (int i = 0; i < 9; ++i)
      {
            ret = writer.insert(new DateTime(2022, 3, 23), "AAAAAAAB", random.Next() % 10000);
      }

}
catch (Exception e)
{
      Console.WriteLine(e.Message);
      //Thread is exiting.
}
writer.waitForThreadCompletion();
writeStatus = writer.getStatus();
if (writeStatus.errorCode != "")
      //an error occurred when writing
      Console.WriteLine(string.Format("error in writing:\n {0}", writeStatus.ToString()));
Console.WriteLine(((BasicLong)dBConnection.run("exec count(*) from pt")).getLong());

/*
      error in writing: Cause of write failure
sentRows: 100
unsentRows: 3
sendFailedRows: 7
threadId : 3 sentRows : 20 unsentRows : 0 sendFailedRows : 5
threadId : 4 sentRows : 20 unsentRows : 2 sendFailedRows : 1
threadId : 5 sentRows : 20 unsentRows : 1 sendFailedRows : 0
threadId : 6 sentRows : 20 unsentRows : 0 sendFailedRows : 0
threadId : 7 sentRows : 20 unsentRows : 0 sendFailedRows : 1

100
      */

for (int i = 0; i < 30; ++i)
      Console.Write('-');
Console.WriteLine();

List<List<IEntity>> unwriterdata = new List<List<IEntity>>();
if (writeStatus.sentRows != 110)
{
      Console.WriteLine("error after write complete:" + writeStatus.errorInfo);
      unwriterdata = writer.getUnwrittenData();
      Console.WriteLine("unwriterdata {0}", unwriterdata.Count);

      //To rewrite this data, a new MTW needs to be created because the original MTW is no longer available due to an exception.
      MultithreadedTableWriter newWriter = new MultithreadedTableWriter(HOST, PORT, USER, PASSWD, "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[] { Vector_Fields.COMPRESS_LZ4, Vector_Fields.COMPRESS_LZ4, Vector_Fields.COMPRESS_DELTA });
      try
      {
            //insert unwritten data
            if (newWriter.insertUnwrittenData(unwriterdata).errorCode != "")
            {
            //wait until MTW finishes writing
            newWriter.waitForThreadCompletion();
            writeStatus = newWriter.getStatus();
            if (writeStatus.errorCode != "")
            {
                  System.Console.Out.WriteLine("error in write again:" + writeStatus.errorInfo);
            }
            }

      }
      finally
      {
            newWriter.waitForThreadCompletion();
            writeStatus = newWriter.getStatus();
            Console.WriteLine(string.Format("write again result:\n{0}", writeStatus));
      }
}
else
      Console.WriteLine("write complete : \n {0}", writeStatus.ToString());
//check the status
Console.WriteLine(((BasicLong)dBConnection.run("exec count(*) from pt")).getLong());
/*
      unwriterdata 10
write again result:

isExiting: True
sentRows: 10
unsentRows: 0
sendFailedRows: 0
threadId : 8 sentRows : 5 unsentRows : 0 sendFailedRows : 0
threadId : 9 sentRows : 3 unsentRows : 0 sendFailedRows : 0
threadId : 10 sentRows : 1 unsentRows : 0 sendFailedRows : 0
threadId : 11 sentRows : 0 unsentRows : 0 sendFailedRows : 0
threadId : 12 sentRows : 1 unsentRows : 0 sendFailedRows : 0

110
      */

Enable the Callback class for MultithreadedTableWriter

With callback enabled, the Callback class is inherited and method writeCompletion is reloaded to obtain the callback data.

A table of type BasicTable will be returned using the callback method, which consists of two columns:

  • The first column (of String type) holds the id of each row added with MultithreadedTableWriter.insert.
  • The second column (of Boolean type) indicates whether each row was written successfully or not. True means it was written successfully; False means it failed.

Example:

public class CallbackHandler : Callback
{
    public void writeCompletion(ITable callbackTable)
    {
        List<String> failedIdList = new List<string>();
        BasicStringVector idVec = (BasicStringVector)callbackTable.getColumn(0);
        BasicBooleanVector successVec = (BasicBooleanVector)callbackTable.getColumn(1);
        for (int i = 0; i < successVec.rows(); i++)
        {
            if (!successVec.getBoolean(i))
            {
                failedIdList.Add(idVec.getString(i));
            }
        }
    }
}

Example:

MultithreadedTableWriter mtw = new MultithreadedTableWriter(host, port, userName, password, dbName, tbName, useSSL,
        enableHighAvailability, null, 10000, 1, 1, "price", null,MultithreadedTableWriter.Mode.M_Append,null, new CallbackHandler());

Use the insert method of MultithreadedTableWriter and write the id for each row in the first column.

String theme = "theme1";
for (int id = 0; id < 1000000; id++){
    mtw.insert(theme + id, code, price); //theme+id is the id of each row, which will be returned in the callback
}

Update and Write to DolphinDB Tables

DolphinDB C# API provides AutoFitTableUpsert class to update and write to DolphinDB tables. AutoFitTableUpsert has the same functions as MultithreadedTableWriter when its parameter mode is specified as Mode.M_Upsert. The difference is that AutoFitTableUpsert is single-threaded and MultithreadedTableWriter writes with multiple threads.

The methods of AutoFitTableUpsert object are introduced as follows:

AutoFitTableUpsert(string dbUrl, string tableName, DBConnection connection, bool ignoreNull, string[] pkeyColNames, string[] psortColumns)

Parameters:

  • dbUrl: a string indicating the path of the DFS database. It is specified as None for in-memory table.
  • tableName: a STRING indicating the in-memory or DFS table name.
  • connection: a DBConnection object to connect to the DolphinDB server and upsert data. Note that asynchronousTask must be false when creating the DBConnection object for AutoFitTableUpsert.
  • ignoreNull: a Boolean value. It is used to specify the parameter ignoreNull of upsert!, indicating whether to update the NULL values of the target table.
  • pkeyColNames: a STRING array. It is used to specify the parameter keyColNames of upsert!, i.e., specifing the key columns.
  • psortColumnsa STRING array. It is used to specify the parameter sortColumns of upsert!. The updated partitions will be sorted on sortColumns (only within each partition, not across partitions).

The following introduces the upsert method of AutoFitTableUpsert object.

int upsert(BasicTable table)

Details:

Update a BasicTable object to the target table, and return an integer indicating the number of rows that have been updated.

Example:

DBConnection conn = new DBConnection(false, false, false);
conn.connect("192.168.1.116", 18999, "admin", "123456");
String dbName = "dfs://upsertTable";
String tableName = "pt";
String script = "dbName = \"dfs://upsertTable\"\n" +
"if(exists(dbName)){\n" +
"\tdropDatabase(dbName)\t\n" +
"}\n" +
"db  = database(dbName, RANGE,1 10000,,'TSDB')\n" +
"t = table(1000:0, `id`value,[ INT, INT[]])\n" +
"pt = db.createPartitionedTable(t,`pt,`id,,`id)";
conn.run(script);

BasicIntVector v1 = new BasicIntVector(3);
v1.setInt(0, 1);
v1.setInt(1, 100);
v1.setInt(2, 9999);

BasicArrayVector ba = new BasicArrayVector(DATA_TYPE.DT_INT_ARRAY);
ba.append(v1);
ba.append(v1);
ba.append(v1);

List<String> colNames = new List<string>();
colNames.Add("id");
colNames.Add("value");
List<IVector> cols = new List<IVector>();
cols.Add(v1);
cols.Add(ba);
BasicTable bt = new BasicTable(colNames, cols);
String[] keyColName = new String[] { "id" };
AutoFitTableUpsert aftu = new AutoFitTableUpsert(dbName, tableName, conn, false, keyColName, null);
aftu.upsert(bt);
BasicTable res = (BasicTable)conn.run("select * from pt;");
System.Console.Out.WriteLine(res.getString());

Data Type Conversion

The C# API provides objects that correspond to DolphinDB data types. They are usually named as Basic+ <DataType>, such as BasicInt (which corresponds to DolphinDB INT), BasicDate (which corresponds to DolphinDB DATE), etc.

The following section describes the supported C# native types and their corresponding C# API and DolphinDB types regarding to the Scalar and Vector data forms.

Scalar

C#C# APIDolphinDBExamples of C# TypesC# API to DolphinDBDolphinDB to C#
boolBasicBoolBOOLbool boolVar = true;BasicBoolean basicBoolean = new BasicBoolean(boolVar);use getValue method, e.g., basicBoolean.getValue()
byteBasicByteCHARbyte byteVar = 10;BasicByte basicByte = new BasicByte(byteVar);same as above
shortBasicShortSHORTshort shortVar = 10;BasicShort basicShort = new BasicShort(shortVar);
intBasicIntINTint intVar = 10;BasicInt basicInt = new BasicInt(intVar);
longBasicLongLONGlong longVar = 10;BasicLong basicLong = new BasicLong(longVar);
floatBasicFloatFLOATfloat floatVar = 1.0f;BasicFloat basicFloat = new BasicFloat(floatVar);
doubleBasicDoubleDOUBLEdouble doubleVar = 1.0d;BasicDouble basicDouble = new BasicDouble(doubleVar);
DatetimeBasicNanoTimestampNANOTIMESTAMPDateTime dateTimeVar = DateTime.Now;BasicNanoTimestamp basicNanoTimestamp = new BasicNanoTimestamp(dateTimeVar);
same as aboveBasicTimestampTIMESTAMPsame as aboveBasicTimestamp basicTimestamp = new BasicTimestamp(dateTimeVar);
BasicDateDATE BasicDate basicDate = new BasicDate(dateTimeVar);
BasicMonthMONTH BasicMonth basicMonth = new BasicMonth(dateTimeVar);
BasicDateTimeDATETIME BasicDateTime basicDateTime = new BasicDateTime(dateTimeVar);
BasicDateHourDATEHOUR BasicDateHour basicDateHour = new BasicDateHour(dateTimeVar);
TimeSpanBasicNanoTimeNANOTIMETimeSpan time = TimeSpan.FromTicks(1000);BasicNanoTime basicNanoTime = new BasicNanoTime(timeSpan);
same as aboveBasicTimeTIMEsame as aboveBasicTime basicTime = new BasicTime(timeSpan);
BasicSecondSECOND BasicSecond basicSecond = new BasicSecond(timeSpan);
BasicMinuteMINUTE BasicMinute basicMinute = new BasicMinute(timeSpan);
stringBasicStringSTRINGstring var = "tag";BasicString basicString = new BasicString(stringVar);
same as aboveBasicString(BLOB)BLOBsame as aboveBasicString basicBlob = new BasicString(stringVar, true);
BasicInt128INT128string int128String = "e1671797c52e15f763380b45e841ec32";BasicInt128 basicInt128 = BasicInt128.fromString(int128String);string int128Data = basicInt128.getString();
BasicDecimal32Decimal32string stringVal = "1.5555";BasicDecimal32 basicDecimal32 = new BasicDecimal32(stringVal , 4);decimal decimalValue = basicDecimal32.getString();
BasicDecimal64Decimal64same as aboveBasicDecimal64 basicDecimal64 = new BasicDecimal64(stringVal , 4);decimal decimalValue = basicDecimal64.getString();
BasicDecimal128Decimal128 BasicDecimal128 basicDecimal128 = new BasicDecimal128(stringVal , 4);decimal decimalValue = basicDecimal128.getString();
decimalBasicDecimal32Decimal32decimal decimalVal = 1.5555m;BasicDecimal32 basicDecimal32 = new BasicDecimal32(decimalVal, 4); Note: For the method public BasicDecimal32(string data, int scale = -1), if the scale parameter is not provided, the scale is set to match the number of decimal places in the input string.decimal decimalValue = basicDecimal32.getDecimalValue();
same as aboveBasicDecimal64Decimal64same as aboveBasicDecimal64 basicDecimal64 = new BasicDecimal64(decimalVal, 4); Note: For the method public BasicDecimal364(string data, int scale = -1), if the scale parameter is not provided, the scale is set to match the number of decimal places in the input string.decimal decimalValue = basicDecimal64.getDecimalValue();
BasicDecimal128Decimal128 BasicDecimal128 basicDecimal128 = new BasicDecimal128(decimalVal, 4); Note: For the method public BasicDecimal128(string data, int scale = -1), if the scale parameter is not provided, the scale is set to match the number of decimal places in the input string.decimal decimalValue = basicDecimal128.getDecimalValue();
GuidBasicUuidUUIDGuid uuid = Guid.NewGuid();BasicUuid basicUuid = BasicUuid.fromString(uuid.ToString());Guid uuidData = Guid.Parse(basicUuid.getString());

Vector

C#C# APIDolphinDBConstructors for C# TypesC# API to DolphinDBDolphinDB to C#
boolBasicBooleanVectorBOOL void add(object value); void setBoolean(int index, bool value)bool getBoolean(int index)
byteBasicBooleanVectorBOOLBasicBooleanVector(IList<byte?> list); BasicBooleanVector(byte[] array)void add(object value)
byteBasicByteVectorCHARBasicByteVector(IList<byte?> list); BasicByteVector(byte[] array)void add(object value); void setByte(int index, byte value)byte getByte(int index)
shortBasicShortVectorSHORTBasicShortVector(IList<short?> list); BasicShortVector(short[] array)void add(object value); void setShort(int index, short value)short getShort(int index)
intBasicIntVectorINTBasicIntVector(IList<int?> list); BasicIntVector(int[] array)void add(object value); void setInt(int index, int value)int getInt(int index)
longBasicLongVectorLONGBasicLongVector(IList<long?> list); BasicLongVector(long[] array)void add(object value); void setLong(int index, long value)long getLong(int index)
floatBasicFloatVectorFLOATBasicFloatVector(IList<float?> list); BasicFloatVector(float[] array)void add(object value); void setFloat(int index, float value)float getFloat(int index)
doubleBasicDoubleVectorDOUBLEBasicDoubleVector(IList<double?> list); BasicDoubleVector(double[] array)void add(object value); void setDouble(int index, double value)double getDouble(int index)
DatetimeBasicNanoTimestampNANOTIMESTAMP void add(object value); void setNanoTimestamp(int index, DateTime dt)DateTime getNanoTimestamp(int index)
same as aboveBasicTimestampVectorTIMESTAMP void add(object value); void setNanoTimestamp(int index, DateTime dt)DateTime getTimestamp(int index)
BasicDateVectorVectorDATE void add(object value); void setDate(int index, DateTime date)DateTime getDate(int index)
BasicMonthVectorMONTH void setMonth(int index, DateTime month)DateTime getMonth(int index)
BasicDateTimeVectorDATETIME void add(object value); void setDateTime(int index, DateTime dt)DateTime getDateTime(int index)
BasicDateHourVectorDATEHOUR void add(object value); void setDateTime(int index, DateTime dt)DateTime getDateTime(int index)
TimeSpanBasicNanoTimeVectorNANOTIME void add(object value); void setNanoTime(int index, TimeSpan time)TimeSpan getNanoTime(int index)
same as aboveBasicTimeVectorTIME void add(object value); void setTime(int index, TimeSpan time)TimeSpan getTime(int index)
BasicSecondVectorSECOND void add(object value); void setSecond(int index, TimeSpan time)TimeSpan getSecond(int index)
BasicMinuteVectorMINUTE void add(object value); void setMinute(int index, TimeSpan time)TimeSpan getMinute(int index)
stringBasicStringVectorSTRING void add(object value); void setString(int index, string value)string getString(int index)
same as aboveBasicStringVector(BLOB)BLOB void add(object value); void setString(int index, string value)string getString(int index)
BasicDecimal32VectorDecimal32BasicDecimal32Vector(string[] data, int scale); BasicDecimal32Vector(List<string> list, int scale)void add(object value)Get the IScalar object by the get method and call the getString method, e.g., basicDecimal32Vector.get(index).getString()
BasicDecimal64VectorDecimal64BasicDecimal64Vector(string[] data, int scale); BasicDecimal64Vector(List<string> list, int scale)void add(object value)Get the IScalar object by the get method and call the getString method, e.g., basicDecimal64Vector.get(index).getString()
BasicDecimal128VectorDecimal128BasicDecimal128Vector(string[] data, int scale); BasicDecimal128Vector(List<string> list, int scale)void add(object value)Get the IScalar object by the get method and call the getString method, e.g., basicDecimal128Vector.get(index).getString()
decimalBasicDecimal32VectorDecimal32BasicDecimal32Vector(decimal[] data, int scale); BasicDecimal32Vector(List<decimal> list, int scale)void add(object value)decimal getDecimal(int index)
BasicDecimal64VectorDecimal64BasicDecimal64Vector(decimal[] data, int scale); BasicDecimal64Vector(List<decimal> list, int scale)void add(object value)decimal getDecimal(int index)
BasicDecimal128VectorDecimal128BasicDecimal128Vector(decimal[] data, int scale); BasicDecimal128Vector(List<decimal> list, int scale)void add(object value)decimal getDecimal(int index); Note: Before using getDecimal, it is important to check for NULLs in BasicDecimal128Vector. Use the isNull method to identify NULLs, as the NULL value in Decimal128 will exceed the range that a C# decimal can represent.

C# Streaming API

A C# program can subscribe to streaming data via API. C# API can acquire streaming data in the following 3 ways: ThreadedClient, ThreadPooledClient, and PollingClient.

Interfaces

The corresponding interfaces of subscribe are:

(1) Subscribe using ThreadedClient:

subscribe(string host, int port, string tableName, string actionName, MessageHandler handler, long offset, bool reconnect, IVector filter, int batchSize, float throttle = 0.01f, StreamDeserializer deserializer = null, string user = "", string password = "", bool msgAsTable = false)

Parameters:

  • host: the IP address of the publisher node.
  • port: the port number of the publisher node.
  • tableName: a string indicating the name of the publishing stream table.
  • actionName: a string indicating the name of the subscription task.
  • handler: a user-defined function to process the subscribed data.
  • offset: an integer indicating the position of the first message where the subscription begins. A message is a row of the stream table. If offset is unspecified, negative or exceeding the number of rows in the stream table, the subscription starts with the next new message. offset is relative to the first row of the stream table when it is created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
  • reconnect: a Boolean value indicating whether to resubscribe after network disconnection.
  • filter: a vector indicating the filtering conditions. Only the rows with values of the filtering column in the vector specified by the parameter filter are published to the subscriber.
  • batchSize: an integer indicating the number of unprocessed messages to trigger the handler. If it is positive, the handler does not process messages until the number of unprocessed messages reaches batchSize. If it is unspecified or non-positive, the handler processes incoming messages as soon as they come in.
  • throttle: a floating-point number indicating the maximum waiting time (in seconds) before the handler processes the incoming messages. The default value is 1. This optional parameter has no effect if batchSize is not specified.
  • deserializer: the deserializer for the subscribed heterogeneous stream table.
  • user: a string indicating the username used to connect to the server.
  • password: a string indicating the password used to connect to the server.
  • msgAsTable: a Boolean value indicating whether the subscribed data is ingested into handler as a Table. true means the subscribed data is ingested into handler as a Table.false (default) means the subscribed data is ingested into handler as an AnyVector. Note: Do not set batchSize and deserializer when msgAsTable=true. Setting msgAsTable=true is not recommended when subscribing to heterogeneous stream tables.

(2) Subscribe using ThreadPooledClient:

subscribe(string host, int port, string tableName, string actionName, MessageHandler handler, long offset, bool reconnect, IVector filter, StreamDeserializer deserializer = null, string user = "", string password = "", bool msgAsTable = false)

(3) Subscribe using PollingClient:

subscribe(string host, int port, string tableName, string actionName, long offset, bool reconnect, IVector filter, StreamDeserializer deserializer = null, string user = "", string password = "", bool msgAsTable = false)

Code Examples

The following examples introduce how to subscribe to stream table:

  • The application on the client periodically checks if new data has been added to the streaming table. If yes, the application will acquire and consume the new data.
PollingClient client = new PollingClient(subscribeHost, subscribePort);
TopicPoller poller1 = client.subscribe(serverIP, serverPort, tableName, offset);

while (true)
{
      List<IMessage> msgs = poller1.poll(1000);

      if (msgs.Count > 0)
      {
            foreach(IMessage msg in msgs)
            System.Console.Out.WriteLine(string.Format("receive: {0}, {1}, {2}", msg.getEntity(0).getString(), msg.getEntity(1).getString(), msg.getEntity(2).getString()));
      }
      /*
      Successfully subscribed table 192.168.1.38:18848:local8848/Trades/csharpStreamingApi
      receive: 1, 2022.05.26T10:39:22.105, 1.5
      */
}
  • The API uses MessageHandler to get new data

First you need to define the message handler, which needs to implement dolphindb.streaming.MessageHandler interface.

public class MyHandler : MessageHandler
{
      public void doEvent(IMessage msg)
      {
            System.Console.Out.WriteLine(string.Format("receive: {0}, {1}, {2}", msg.getEntity(0).getString(), msg.getEntity(1).getString(), msg.getEntity(2).getString()));
      }

      public void batchHandler(List<IMessage> msgs)
      {
            throw new NotImplementedException();
      }
}

You can pass the handler instance into function subscribe as a parameter with single-thread or multi-thread callbacks.

(1) ThreadedClient

ThreadedClient client = new ThreadedClient(subscribeHost, subscribePort);
client.subscribe(serverIP, serverPort, tableName, new MyHandler());
Thread.Sleep(10000);
//To cancel the subscription, you can use function close.
client.close();

(2) ThreadPooledClient: Handler mode client (multithreading)

ThreadPooledClient client = new ThreadPooledClient(subscribeHost, subscribePort);
client.subscribe(serverIP, serverPort, tableName, new MyHandler());
//To cancel the subscription, you can use function close.
Thread.Sleep(10000);
client.close();

Reconnect

Parameter reconnect is a Boolean value indicating whether to automatically resubscribe after the subscription experiences an unexpected interruption. The default value is false.

When reconnect=true:

  • If the publisher and the subscriber both stay on but the network connection is interrupted, then after network is restored, the subscriber resumes subscription from where the network interruption occurs.
  • If the publisher crashes, the subscriber will keep attempting to resume subscription after the publisher restarts.
    • If persistence was enabled on the publisher, the publisher starts to read the persisted data on disk after restarting. Automatic resubscription would fail until the publisher has read the data for the time when the publisher crashed.
    • If persistence was not enabled on the publisher, the automatic subscription will fail.
  • If the subscriber crashes, the subscriber won't automatically resume the subscription after it restarts. In this case, we need to execute function subscribe again.

Parameter reconnect is set to be true for the following example:

PollingClient client = new PollingClient(subscribePort);
TopicPoller poller1 = client.subscribe(serverIP, serverPort, tableName, offset, true);

Filter

Parameter filter is a vector. It is used together with function setStreamTableFilterColumn at the publisher node. Function setStreamTableFilterColumn specifies the filtering column in the streaming table. Only the rows with filtering column values in filter are published.

In the following example, parameter filter is assigned an INT vector [1,2]:

BasicIntVector filter = new BasicIntVector(2);
filter.setInt(0, 1);
filter.setInt(1, 2);

PollingClient client = new PollingClient(subscribePort);
TopicPoller poller1 = client.subscribe(serverIP, serverPort, tableName, actionName, offset, filter);

Subscribe to a Heterogeneous Table

Since DolphinDB server version 1.30.17/2.00.5, the replay function supports replaying (serializing) multiple stream tables with different schemata into a single stream table (known as "heterogeneous stream table"). Starting from DolphinDB C# API version 1.30.19, a new class streamDeserializer has been introduced for the subscription and deserialization of heterogeneous stream table.

Construct Deserializer for Heterogeneous Stream Table

You can construct a deserializer for heterogeneous table with streamDeserializer.

(1) With specified table schema:

  • specified schema
StreamDeserializer(Dictionary<string, BasicDictionary> filters)
  • specified column types
StreamDeserializer(Dictionary<string, List<DATA_TYPE>> filters)

(2) With specified table:

StreamDeserializer(Dictionary<string, Tuple<string, string>> tableNames, DBConnection conn = null)

Code example:

//Supposing the inputTables to be replayed is:
//d = dict(['msg1', 'msg2'], [table1, table2]); 
//replay(inputTables = d, outputTables = `outTables, dateColumn = `timestampv, timeColumn = `timestampv)";
//create a deserializer for heterogeneous table

{//specify schema
      BasicDictionary outSharedTables1Schema = (BasicDictionary)conn.run("table1.schema()");
      BasicDictionary outSharedTables2Schema = (BasicDictionary)conn.run("table2.schema()");
      Dictionary<string, BasicDictionary> schemas = new Dictionary<string, BasicDictionary>();
      schemas["msg1"] = outSharedTables1Schema;
      schemas["msg2"] = outSharedTables2Schema;
      StreamDeserializer streamFilter = new StreamDeserializer(schemas);
}
{//or specify column types
	Dictionary<string, List<DATA_TYPE>> colTypes = new Dictionary<string, List<DATA_TYPE>>();
	List<DATA_TYPE> table1ColTypes = new List<DATA_TYPE> { DATA_TYPE.DT_DATETIME, DATA_TYPE.DT_TIMESTAMP, DATA_TYPE.DT_SYMBOL, DATA_TYPE.DT_DOUBLE, DATA_TYPE.DT_DOUBLE };
	colTypes["msg1"] = table1ColTypes;
	List<DATA_TYPE> table2ColTypes = new List<DATA_TYPE> { DATA_TYPE.DT_DATETIME, DATA_TYPE.DT_TIMESTAMP, DATA_TYPE.DT_SYMBOL, DATA_TYPE.DT_DOUBLE };
	colTypes["msg2"] = table2ColTypes;
	StreamDeserializer streamFilter = new StreamDeserializer(colTypes);
}
{//specify tables
      Dictionary<string, Tuple<string, string>> tables = new Dictionary<string, Tuple<string, string>>();
      tables["msg1"] = new Tuple<string, string>("", "table1");
      tables["msg2"] = new Tuple<string, string>("", "table2");
      //conn is an optional parameter
      StreamDeserializer streamFilter = new StreamDeserializer(tables, conn);
}

Subscribe to a Heterogeneous Table

(1) subscribe to a heterogeneous table using ThreadedClient:

  • specify the parameter deserializer of function subscribe to deserialize the table when data is ingested:
ThreadedClient client = new ThreadedClient(listenport);
client.subscribe(hostName, port, tableName, actionName, handler, 0, true, null, -1, (float)0.01, streamFilter);
  • add the streamFilter to user-defined Handler:
public class Handler6 : MessageHandler
      {
      private StreamDeserializer deserializer_;
      private List<BasicMessage> msg1 = new List<BasicMessage>();
      private List<BasicMessage> msg2 = new List<BasicMessage>();

      public Handler6(StreamDeserializer deserializer)
      {
            deserializer_ = deserializer;
      }

      public void batchHandler(List<IMessage> msgs)
      {
            throw new NotImplementedException();
      }

      public void doEvent(IMessage msg)
      {
            try
            {
                  BasicMessage message = deserializer_.parse(msg);
                  if (message.getSym() == "msg1")
                  {
                  msg1.Add(message);
                  }
                  else if (message.getSym() == "msg2")
                  {
                  msg2.Add(message);
                  }
            }
            catch (Exception e)
            {
                  System.Console.Out.WriteLine(e.StackTrace);
            }
      }

      public List<BasicMessage> getMsg1()
      {
            return msg1;
      }

      public List<BasicMessage> getMsg2()
      {
            return msg2;
      }
      };

Handler6 handler = new Handler6(streamFilter);
ThreadedClient client = new ThreadedClient(listenport);
client.subscribe(SERVER, PORT, tableName, actionName, handler, 0, true);

(2) subscribe to a heterogeneous table using ThreadPooledClient is similar as above:

  • specify the parameter deserializer of function subscribe
ThreadPooledClient client = new ThreadPooledClient(listenport);
client.subscribe(hostName, port, tableName, actionName, handler, 0, true, null, streamFilter);
  • add the streamFilter to user-defined Handler:
Handler6 handler = new Handler6(streamFilter);
ThreadPooledClient client = new ThreadPooledClient(listenport);
client.subscribe(hostName, port, tableName, actionName, handler, 0, true);

(3) As PollingClient does not support callbacks, you can only pass the deserializer parameter to the function subscribe:

PollingClient client = new PollingClient(listenport);
TopicPoller poller = client.subscribe(hostName, port, tableName, actionName, 0, true, null, streamFilter);

Unsubscribe

Each subscription is identified with a subscription topic. Subscription fails if a topic with the same name already exists. You can cancel the subscription with unsubscribe.

client.unsubscribe(serverIP, serverPort, tableName,actionName);

Complex Event Processing

The DolphinDB 3.00.0 introduces a new feature - Complex Event Processing (CEP) - to find patterns in event data that enable detection of opportunities and threats. To better interact with server, C# API 3.00.1.0 provides EventSchema, EventSender and EventClient classes for constructing, writing, and subscribing to events.

This section describes event-related operations, including:

  • Defining events.
  • Writing events to the heterogeneous stream table as a data source for the CEP engine on the DolphinDB server.
  • Subscribing to events in the heterogeneous stream table output by the CEP engine on the DolphinDB server.

For more details, see the "Complex Event Processing" chapter of the DolphinDB server user manual.

Defining Events (EventSchema)

EventSchema can be used to define events on API client.

Constructor

public EventSchema(string eventType, List<string> fieldNames, 
  List<DATA_TYPE> fieldTypes, List<DATA_FORM> fieldForms, 
  List<int> fieldExtraParams = null)

Arguments

  • eventType: A String indicating the event.
  • fieldNames: A String indicating the field names.
  • fieldTypes: Data types of fields.
  • fieldForms: Data forms of fields.
  • fieldExtraParams: A non-negative int used to specify the scale when the field type is DECIMAL.

Sending Events (EventSender)

EventSender can be used to write serialized event data into heterogeneous stream tables, which are subscribed by CEP engines in DolphinDB to capture and process the events.

Constructor

EventSender(DBConnection conn, string tableName, List<EventSchema> eventSchema, List<string> eventTimeFields = null, List<string> commonFields = null)

Arguments

  • conn: A successfully connected DBConnection object, which should not be in asynchronous mode.
  • tableName: A String indicating the name of the heterogeneous stream table to write into.
  • eventSchema: All event schemas the CEP engine can process, i.e., all events to be sent. Note that the event schemas must match those specified when creating the CEP engine.
  • eventTimeFields (optional): The time field for each event. It must be specified if the first column of the heterogeneous stream table is of temporal type.
    1. It is a String if the field name is the same for all events.
    2. It is a String list if field names vary across events. The order of strings must match that of the events specified in eventSchema.
  • commonFields (optional): A String list indicating the fields with the same name across events. These fields can be stored as separate columns in the stream table for filtering during subscription.

sendEvent

Use the sendEvent method to send events to DolphinDB server.

public void sendEvent(String eventType, List<Entity> attributes)

Arguments

  • eventType: The event type.
  • attributes: The values of each member in the event. Note that the order of the values must match the field order specified when defining the EventSchema, and their data types and forms must also match.

Subscribing to Events (EventClient)

The API provides the EventClient tool for subscribing to events in heterogeneous stream tables. These tables typically receive event data processed and serialized by the DolphinDB CEP engine.

Constructor

public EventClient(List<EventSchema> eventSchema, List<string> eventTimeFields = null, List<string> commonFields = null)

Arguments

  • eventSchema: All event schemas the CEP engine can process, i.e., all events to be sent. Note that the event schemas must match those specified when creating the CEP engine.
  • eventTimeFields (optional): The time field for each event. It must be specified if the first column of the heterogeneous stream table is of temporal type.
    1. It is a String if the field name is the same for all events.
    2. It is a String list if field names vary across events. The order of strings must match that of the events specified in eventSchema.
  • commonFields (optional): A String list indicating the fields with the same name across events. These fields can be stored as separate columns in the stream table for filtering during subscription.

subscribe

public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, String userName, String password)

Arguments

  • host: IP address of the publisher node.
  • port: Port number of the publisher node.
  • tableName: Name of the publisher stream table.
  • actionName (optional): The subscription task.
  • handler: A user-defined function to process the subscribed data.

For the function called by handler, the first parameter indicates the event type, and the second parameter represents the event fields.

class MyEventMessageHandler : EventMessageHandler
{
    public void doEvent(string eventType, List<IEntity> attribute)
    {

      // Handling logic
    }
}
  • offset (optional, default -1): An integer indicating the position of the first message where the subscription begins. A message is a row of the stream table. If offset is unspecified, negative or exceeding the number of rows in the stream table, the subscription starts with the next new message. offset is relative to the first row of the stream table when it is created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
  • reconnect (optional, default false): Whether to enable reconnection.
  • user (optional, default ""): Username used to connect to the server.
  • password (optional, default ""): Password.

Canceling Subscription

public void unsubscribe(String host, int port, String tableName, String actionName)

Arguments

  • host: IP address of the publisher node.
  • port: Port number of the publisher node.
  • tableName: Name of the publisher stream table.
  • actionName: The subscription task.

Usage Example

This example introduces how to create events on the server, interfaces for serializing events, heterogeneous stream tables for writing and outputting events, and the CEP engine for processing events. Then use EventSender on the API to send events to the heterogeneous stream table. After the CEP engine processes and outputs messages to another heterogeneous stream table through subscription, use EventClient on the API side to subscribe to the output events.

The following script will create two heterogeneous stream tables "input" and "output", two streamEventSerializer (serialization interfaces) objects, a CEP engine, and a "MarketData" event on the DolphinDB server.

class MarketData{
  market :: STRING
  code::STRING
  price :: DOUBLE
  qty :: INT
  eventTime :: TIMESTAMP
  def MarketData(m,c,p,q){
    market = m
    code = c
    price = p
    qty = q
    eventTime = now()   
  }
}   

class MainMonitor{

  def MainMonitor(){
  }
  def updateMarketData(event)

  def onload(){
    addEventListener(updateMarketData,'MarketData',,'all')
      }

  def updateMarketData(event){
    emitEvent(event)
  }
}
dummy = table(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
share streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput
share streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as output
schema = table(1:0, `eventType`eventKeys`eventValuesTypeString`eventValueTypeID`eventValuesFormID, [STRING, STRING, STRING, INT[], INT[]])
insert into schema values("MarketData", "market,code,price,qty,eventTime", "STRING,STRING,DOUBLE,INT,TIMESTAMP", [18 18 16 4 12], [0 0 0 0 0]) 
inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schema, outputTable=intput, eventTimeField = "eventTime")
outputSerializer = streamEventSerializer(name=`serOutput, eventSchema=schema, outputTable=output, eventTimeField = "eventTime")

engine = createCEPEngine('cep1', <MainMonitor()>, dummy, [MarketData], 1, 'eventTime', 10000, outputSerializer)
subscribeTable(,`intput, `subopt, 0, getStreamEngine('cep1'),true)

Then write data to input via EventSender on API.

// Define schema
List<string> fieldNames = new List<string> { "market", "code", "price", "qty", "eventTime" };
List<DATA_TYPE> fieldTypes = new List<DATA_TYPE> {
  DATA_TYPE.DT_STRING, DATA_TYPE.DT_STRING, DATA_TYPE.DT_DOUBLE, DATA_TYPE.DT_INT, DATA_TYPE.DT_TIMESTAMP };
List<DATA_FORM> fieldForms = new List<DATA_FORM> {
  DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR };
EventSchema schema = new EventSchema("MarketData", fieldNames, fieldTypes, fieldForms);
List<EventSchema> eventSchemas = new List<EventSchema> { schema };
List<string> eventTimeKeys = new List<string> { "eventTime" };

// Create EventSender
DBConnection conn = new DBConnection();
conn.connect(HOST, PORT, "admin", "123456");
EventSender sender = new EventSender(conn, "intput", eventSchemas, eventTimeKeys);

// Prepare data
List<IEntity> attributes = new List<IEntity> { new BasicString("sz"), new BasicString("s001") ,
  new BasicDouble(9.8), new BasicInt(100), new BasicTimestamp(DateTime.Now)};
sender.sendEvent("MarketData", attributes);

The stream table input is subscribed by a CEP engine, which generates events to the output table.

Then use EventClient to subscribe to the output table.

// user-defined function called by handler
class MyEventMessageHandler : EventMessageHandler
{
    public void doEvent(string eventType, List<IEntity> attribute)
    {// user-defined processing
        System.Console.WriteLine("eventType: " + eventType);
        foreach (IEntity entity in attribute)
        {
            System.Console.WriteLine(entity.getString());
        }
        System.Console.WriteLine();
    }
}

// Define schema
List<string> fieldNames = new List<string> { "market", "code", "price", "qty", "eventTime" };
List<DATA_TYPE> fieldTypes = new List<DATA_TYPE> {
    DATA_TYPE.DT_STRING, DATA_TYPE.DT_STRING, DATA_TYPE.DT_DOUBLE, DATA_TYPE.DT_INT, DATA_TYPE.DT_TIMESTAMP };
List<DATA_FORM> fieldForms = new List<DATA_FORM> {
    DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR, DATA_FORM.DF_SCALAR };
EventSchema schema = new EventSchema("MarketData", fieldNames, fieldTypes, fieldForms);
List<EventSchema> eventSchemas = new List<EventSchema> { schema };
List<string> eventTimeKeys = new List<string> { "eventTime" };

// create EventClient
EventClient client = new EventClient(eventSchemas, eventTimeKeys);

// create subscription
EventMessageHandler handler = new MyEventMessageHandler();
client.subscribe(HOST, PORT, "output", "CSharpClient", handler, 0, true, "admin", "123456");
Thread.Sleep(10000);

// cancel subscription
client.unsubscribe(HOST, PORT, "output", "CSharpClient");