Append Data

DolphinDB provides three methods to append data to DFS and in-memory tables:

  • INSERT INTO statement: SQL-style syntax for row-wise insertion.
  • append! function: Table-to-table append operations.
  • tableInsert function: Flexible data insertion with record count tracking.

Append Methods

SQL INSERT INTO Statement

Use SQL INSERT INTO with VALUES to append rows to in-memory tables or DFS tables. For DFS tables, enableInsertStatementForDFSTable must be configured to true.

Append Values to All Columns

To append data to all columns, skip column names and specify new values for each column in the VALUES clause.

INSERT INTO <tableName> VALUES (val1, val2, val3, ...);

Append Values to Specific Columns

To append data to specific columns, specify the column names in the parentheses after the table name and provide the corresponding values in the VALUES clause. Unspecified columns are filled with NULLs.

// Append data to columns col1,col2,col4...
INSERT INTO <tableName> (col1, col2, col4, ...) VALUES (val1, val2, val4, ...);
Note: For partitioned tables, the partitioning columns must be included.

Examples

t = table(1:0,`id`sym`val,[INT,SYMBOL,DOUBLE])
// Append to all columns
INSERT INTO t VALUES ([1,2],["A","B"],[2.3,3.6])
// Append 2 columns
INSERT INTO t (id,val) VALUES (3,7.6)

append! Function

Use function append! to append data to in-memory tables or DFS tables. The data must be organized in a table with the same schema as the target table, especially the data type and order of columns. Column names are not required to be consistent.

Examples

t = table(1:0,`id`sym`val,[INT,SYMBOL,DOUBLE])
tmp = table(1..10 as id, take(`A001`B001,10) as sym, rand(10.0,10) as val)
append!(t,tmp)

tableInsert Function

Use function tableInsert to append data to in-memory tables or DFS tables. The data can be organized in a table, tuple, dictionary, or multiple vectors or tuples. Unlike append! that has no return value, tableInsert returns the number of inserted records.

Examples

The following script appends 10 records to a partitioned in-memory table with 5 partitions.

db = database("", VALUE, 1..5)
schemaTb = table(1:0, `id`sym`val, [INT,SYMBOL,DOUBLE])
t = db.createPartitionedTable(schemaTb, `mpt, `id)

tmp = table(1..10 as id, take(`A`B,10) as sym, rand(10.0,10) as val)
tableInsert(t, tmp) 
// output: 5

5 records with sym outside the partition range are discarded. So tableInsert returns the number of inserted records 5.

DFS Data Append Considerations

Data Size Limits

Size constraints are applied to strings written to DFS tables:

  • Data of STRING type must be smaller than 64 KB, otherwise it will be truncated to 65,535 bytes (i.e., 64 KB - 1 byte);
  • Data of BLOB type must be smaller than 64 MB, otherwise it will be truncated to 67,108,863 bytes (i.e., 64 MB - 1 byte);
  • Data of SYMBOL type must be smaller than 255 bytes, otherwise the system will throw an exception.

Handling Special Character

For a partitioning column of STRING or SYMBOL type in a VALUE-partitioned database, spaces and characters such as "\n", "\r", "\t" are not allowed. Otherwise an error A STRING or SYMBOL column used for value-partitioning cannot contain invisible characters. is thrown.

Handling Unordered Data Writes

Write performance remains largely unaffected whether appending ordered or unordered data, as the sorting process occurs asynchronously. The storage order within partitions varies by storage engine type:

  • OLAP: Maintains data in write order within each partition.
  • TSDB: Sorts data by sort columns within each level file.
  • PKEY: Sorts data by primary key within each level file.

Atomic Transactions

Whether transactions are supported when concurrently appending data to multiple partitions depends on the atomicity level specified by atomic when creating a database.

  • When atomic is set to 'TRANS', transactions are supported. If a transaction attempts to write to multiple chunks and one of the chunks is locked by another transaction, a write-write conflict occurs, and all writes of the transaction fail.
    // Create a database and set atomic to 'TRANS'
    if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
    db1 = database("",VALUE,2024.01.01..2024.01.03)
    db2 = database("",HASH,[SYMBOL,4])
    db = database("dfs://test", COMPO, [db1,db2],,"TSDB","TRANS")
    tmp = table(1:0, `date`time`sym`val, [DATE,SECOND,SYMBOL,INT])
    pt = db.createPartitionedTable(tmp,`pt,`date`sym,,`time,ALL)
    
    // Submit concurrent write jobs
    n =  100
    t1 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    t2 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    t3 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    submitJob("writeData1", "writeData1", tableInsert, pt, t1)
    submitJob("writeData2", "writeData2", tableInsert, pt, t2)
    submitJob("writeData3", "writeData3", tableInsert, pt, t3)
    
    // Check results
    select count(*) from pt 
    // Returns 100, meaning 100 records were successfully written
    Call getRecentJobs() and we can see that two of the write jobs failed with error The openChunks operation failed because the chunk '/test/20240101/Key1/OQ' is currently locked and in use by transaction 3153.
  • When atomic is set to 'CHUNK', if a transaction tries to write to multiple chunks and a write-write conflict occurs as a chunk is locked by another transaction, instead of aborting the writes, the transaction will keep writing to the non-locked chunks and keep attempting to write to the chunk in conflict until it is still locked after a few minutes. As the atomicity at the transaction level is not guaranteed, the write operation may succeed in some chunks but fail in other chunks. Please also note that the write speed may be impacted by the repeated attempts to write to the chunks that are locked.
    if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
    db1 = database("",VALUE,2024.01.01..2024.01.03)
    db2 = database("",HASH,[SYMBOL,4])
    db = database("dfs://test", COMPO, [db1,db2],,"TSDB","CHUNK")
    tmp = table(1:0, `date`time`sym`val, [DATE,SECOND,SYMBOL,INT])
    pt = db.createPartitionedTable(tmp,`pt,`date`sym,,`time,ALL)
    
    n =  100
    t1 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    t2 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    t3 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    
    submitJob("writeData1", "writeData1", tableInsert, pt, t1)
    submitJob("writeData2", "writeData2", tableInsert, pt, t2)
    submitJob("writeData3", "writeData3", tableInsert, pt, t3)
    
    // Check results
    select count(*) from pt 
    // Returns 300, meaning 300 records were successfully written

Appending a Large Amount of Data

To optimize the performance of large-scale data insertions, it is recommended to partition the data and process each partition through separate background jobs. This strategy maximizes throughput by leveraging concurrent writes while preventing write conflicts. The example below demonstrates this using a smaller dataset. The performance advantages become significantly more evident when handling large volumes of historical data.

// Create databases and tables
if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
db1 = database("",VALUE,2024.01.01..2024.01.03)
db2 = database("",HASH,[SYMBOL,4])
db = database("dfs://test", COMPO, [db1,db2],,"TSDB")
tmp = table(1:0, `date`time`sym`val, [DATE,SECOND,SYMBOL,INT])
pt = db.createPartitionedTable(tmp,`pt,`date`sym,,`time,ALL)

// Simulate 100 million records and insert at once
n = 100000000
t = table(take(2024.01.01..2024.01.03,n*3) as date,00:00:00 + 1..(n*3) as time, rand(`A`B`C`D`E`F,n*3) as sym, rand(100,n*3) as val)
timer pt.tableInsert(t)
// Time elapsed: 49329.098 ms

// simulate 100 million records and insert by partition
t1 = table(take(2024.01.01,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
t2 = table(take(2024.01.02,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
t3 = table(take(2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
submitJob("writeData1", "writeData1", tableInsert, pt, t1)
submitJob("writeData2", "writeData2", tableInsert, pt, t2)
submitJob("writeData3", "writeData3", tableInsert, pt, t3)
// Time elapsed returned by getRecentJobs(): around 28000 ms

Appending Out-of-Range Data

Data outside existing partitions can be appended to VALUE or RANGE partitioned databases with configurations and functions. See Add Partitions for details.

// Create a VALUE-partitioned database
if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
db = database("dfs://test", VALUE, 1..5)
tmp = table(1:0,`id`val,[INT,INT])
pt = db.createPartitionedTable(tmp,`pt,`id)

// Append data
pt.tableInsert(table(1..6 as id ,1..6 as val)) 
// Returns 6, where the partiton 6 already exists 

// Create a RANGE-partitioned database with 2 partitions [1,3) and [3,5)
if(existsDatabase("dfs://test1")) dropDatabase("dfs://test1")
db = database("dfs://test1", RANGE, 1 3 5) 
tmp = table(1:0,`id`val,[INT,INT])
pt = db.createPartitionedTable(tmp,`pt,`id)

// Append data
pt.tableInsert(table(1..6 as id ,1..6 as val)) 
// Returns 4, indicating records with id 5 and 6 are discarded

FAQs

Q1. Can data be successfully appended with inconsistent column types?

This depends on the data type compatibility between inserted and target columns, i.e., whether the data types support implicit type conversion. Compatible data types allow appending but may result in precision loss. Incompatible types cause insertion failure.

The following example uses a table t with an id column of INT type. When appending a column of DOUBLE or CHAR into t, the column type will be converted to INT. However, when appending STRING data, the insertion will fail since the STRING type does not support implicit type conversion to INT.

t = table(3:0,[`id],[INT])
t.tableInsert(table(1.2 as id)) // Returns 1
t.tableInsert(table('a' as id)) // Returns 1
t.tableInsert(table("str" as id)) // Error: Failed to append data to column 'id' with error: Incompatible type. Expected: INT, Actual: STRING

Q2. Can data be successfully appended with inconsistent column order?

If you use INSERT INTO to append data, the data will be appended to the specified columns.

t = table(1:0,`id`sym,[INT,STRING])
INSERT INTO t (sym,id) VALUES ("AAA",1)

In other cases, the data will be inserted by column order. For example, the n-th column of the inserted data will be appended to the n-th column of the target table, regardless of column names. In this case, whether the insertion would succeed depends on column type compatibility (as described in Q1).

Q3. Can data be successfully appended with inconsistent column count?

If the number of columns to be appended is greater than that of the target table, the append always fails.

t = table(1:0,[`id],[INT])
t.tableInsert(table(1 as id, 2 as val)) // The number of columns of the table to insert must be the same as that of the original table.

If the number of columns to be inserted is smaller than that of the target table:

(1) In-memory tables:

  • If the columns are specified in the INSERT INTO clause
    • For partitioned in-memory tables, the partitioning columns must be specified, otherwise the data is discarded without error reported.
      db = database("",VALUE,1..3)
      t = db.createPartitionedTable(table(1:0,`id`price`qty,[INT,DOUBLE,INT]),`pt,`id)
      INSERT INTO t (price,qty) VALUES (3.6,100)
    • For non-partitioned in-memory tables, the data insertion will succeed.
      t = table(1:0,`id`price`qty,[INT,DOUBLE,INT])
      INSERT INTO t (price,qty) VALUES (3.6,100)
  • If the columns are not specified in the INSERT INTO clause, or tableInsert or append! is used, the insertion would fail with an error.
    t = table(1:0,`id`price`qty,[INT,DOUBLE,INT])
    INSERT INTO t VALUES (3.6,100) //The number of table columns doesn't match the number of columns to append.
    t.append!(table(3.6 as price,100 as qty)) //The number of columns of the table to insert must be the same as that of the original table.
    t.tableInsert(table(3.6 as price,100 as qty)) //The number of columns of the table to insert must be the same as that of the original table.

(2) DFS tables

  • The data to be inserted must include the partitioning columns, otherwise the insertion would fail.
    if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
    db = database("dfs://test",VALUE,1..5)
    t = table(1:0,`sym`id`price`qty,[SYMBOL,INT,DOUBLE,INT])
    pt = db.createPartitionedTable(t,`pt,`id)
    // insert data
    pt.tableInsert(table("A" as sym)) // Error: The number of columns of the current table must match that of the target table.
    pt.tableInsert(table("A" as sym, 1 as id)) // data is successfully inserted and the script returns 1
  • If n columns have been appended to a partition, m (m<n) columns are no longer allowed to be appended this partition. Only data with no smaller than n columns can be inserted.
    if(existsDatabase("dfs://test1")) dropDatabase("dfs://test1")
    db = database("dfs://test1", VALUE, 1..5) 
    t = table(1:0,`id`sym`price`qty,[INT,SYMBOL,DOUBLE,INT])
    pt = db.createPartitionedTable(t,`pt,`id)
    // insert data
    pt.tableInsert(table(1 as id, "A" as sym)) // Returns 1, 2 columns are successfully inserted into partition 1
    pt.tableInsert(table(1 as id)) // Error: The data to append contains fewer columns than the schema.
    pt.tableInsert(table(1 as id, "A" as sym, 3.6 as price)) // Returns 1, 3 columns are successfully inserted into partition 1
    pt.tableInsert(table(2 as id)) // Returns 1, 1 column is successfully inserted into partition 2

Q4. tableInsert returns an unexpected number of inserted records. What could be the causes?

  • Out-of-range data are involved and new partitions cannot be automatically generated.
  • When writing to a keyed or indexed table, if a key already exists, the write will update the corresponding value. This update is ignored in result and only the newly-added records are counted. In the following example, a keyed table kt contains a record with id 1. Insert records with id 1, 2, and 3, one record is updated and two are inserted into the table.
    kt = keyedTable(`id,1 as id, 1 as val)
    kt.tableInsert(table(1..3 as id, 7..9 as val)) 
    // Returns 2