Append Data
DolphinDB provides three methods to append data to DFS and in-memory tables:
INSERT INTO
statement: SQL-style syntax for row-wise insertion.append!
function: Table-to-table append operations.tableInsert
function: Flexible data insertion with record count tracking.
Append Methods
SQL INSERT INTO
Statement
Use SQL INSERT INTO
with VALUES
to append rows
to in-memory tables or DFS tables. For DFS tables,
enableInsertStatementForDFSTable must be configured to true.
Append Values to All Columns
To append data to all columns, skip column names and specify new values for each
column in the VALUES
clause.
INSERT INTO <tableName> VALUES (val1, val2, val3, ...);
Append Values to Specific Columns
To append data to specific columns, specify the column names in the parentheses
after the table name and provide the corresponding values in the
VALUES
clause. Unspecified columns are filled with NULLs.
// Append data to columns col1,col2,col4...
INSERT INTO <tableName> (col1, col2, col4, ...) VALUES (val1, val2, val4, ...);
Examples
t = table(1:0,`id`sym`val,[INT,SYMBOL,DOUBLE])
// Append to all columns
INSERT INTO t VALUES ([1,2],["A","B"],[2.3,3.6])
// Append 2 columns
INSERT INTO t (id,val) VALUES (3,7.6)
append!
Function
Use function append!
to append data to in-memory tables or DFS
tables. The data must be organized in a table with the same schema as the target
table, especially the data type and order of columns. Column names are not
required to be consistent.
Examples
t = table(1:0,`id`sym`val,[INT,SYMBOL,DOUBLE])
tmp = table(1..10 as id, take(`A001`B001,10) as sym, rand(10.0,10) as val)
append!(t,tmp)
tableInsert
Function
Use function tableInsert
to append data to in-memory tables or
DFS tables. The data can be organized in a table, tuple, dictionary, or multiple
vectors or tuples. Unlike append!
that has no return value,
tableInsert
returns the number of inserted records.
Examples
The following script appends 10 records to a partitioned in-memory table with 5 partitions.
db = database("", VALUE, 1..5)
schemaTb = table(1:0, `id`sym`val, [INT,SYMBOL,DOUBLE])
t = db.createPartitionedTable(schemaTb, `mpt, `id)
tmp = table(1..10 as id, take(`A`B,10) as sym, rand(10.0,10) as val)
tableInsert(t, tmp)
// output: 5
5 records with sym outside the partition range are discarded. So
tableInsert
returns the number of inserted records 5.
DFS Data Append Considerations
Data Size Limits
Size constraints are applied to strings written to DFS tables:
- Data of STRING type must be smaller than 64 KB, otherwise it will be truncated to 65,535 bytes (i.e., 64 KB - 1 byte);
- Data of BLOB type must be smaller than 64 MB, otherwise it will be truncated to 67,108,863 bytes (i.e., 64 MB - 1 byte);
- Data of SYMBOL type must be smaller than 255 bytes, otherwise the system will throw an exception.
Handling Special Character
For a partitioning column of STRING or SYMBOL type in a VALUE-partitioned
database, spaces and characters such as "\n", "\r", "\t" are not allowed.
Otherwise an error A STRING or SYMBOL column used for value-partitioning
cannot contain invisible characters.
is thrown.
Handling Unordered Data Writes
Write performance remains largely unaffected whether appending ordered or unordered data, as the sorting process occurs asynchronously. The storage order within partitions varies by storage engine type:
- OLAP: Maintains data in write order within each partition.
- TSDB: Sorts data by sort columns within each level file.
- PKEY: Sorts data by primary key within each level file.
Atomic Transactions
Whether transactions are supported when concurrently appending data to multiple partitions depends on the atomicity level specified by atomic when creating a database.
- When atomic is set to 'TRANS', transactions are supported. If a
transaction attempts to write to multiple chunks and one of the chunks is
locked by another transaction, a write-write conflict occurs, and all writes
of the transaction
fail.
Call// Create a database and set atomic to 'TRANS' if(existsDatabase("dfs://test")) dropDatabase("dfs://test") db1 = database("",VALUE,2024.01.01..2024.01.03) db2 = database("",HASH,[SYMBOL,4]) db = database("dfs://test", COMPO, [db1,db2],,"TSDB","TRANS") tmp = table(1:0, `date`time`sym`val, [DATE,SECOND,SYMBOL,INT]) pt = db.createPartitionedTable(tmp,`pt,`date`sym,,`time,ALL) // Submit concurrent write jobs n = 100 t1 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val) t2 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val) t3 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val) submitJob("writeData1", "writeData1", tableInsert, pt, t1) submitJob("writeData2", "writeData2", tableInsert, pt, t2) submitJob("writeData3", "writeData3", tableInsert, pt, t3) // Check results select count(*) from pt // Returns 100, meaning 100 records were successfully written
getRecentJobs()
and we can see that two of the write jobs failed with errorThe openChunks operation failed because the chunk '/test/20240101/Key1/OQ' is currently locked and in use by transaction 3153.
- When atomic is set to 'CHUNK', if a transaction tries to write to
multiple chunks and a write-write conflict occurs as a chunk is locked by
another transaction, instead of aborting the writes, the transaction will
keep writing to the non-locked chunks and keep attempting to write to the
chunk in conflict until it is still locked after a few minutes. As the
atomicity at the transaction level is not guaranteed, the write operation
may succeed in some chunks but fail in other chunks. Please also note that
the write speed may be impacted by the repeated attempts to write to the
chunks that are
locked.
if(existsDatabase("dfs://test")) dropDatabase("dfs://test") db1 = database("",VALUE,2024.01.01..2024.01.03) db2 = database("",HASH,[SYMBOL,4]) db = database("dfs://test", COMPO, [db1,db2],,"TSDB","CHUNK") tmp = table(1:0, `date`time`sym`val, [DATE,SECOND,SYMBOL,INT]) pt = db.createPartitionedTable(tmp,`pt,`date`sym,,`time,ALL) n = 100 t1 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val) t2 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val) t3 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val) submitJob("writeData1", "writeData1", tableInsert, pt, t1) submitJob("writeData2", "writeData2", tableInsert, pt, t2) submitJob("writeData3", "writeData3", tableInsert, pt, t3) // Check results select count(*) from pt // Returns 300, meaning 300 records were successfully written
Appending a Large Amount of Data
To optimize the performance of large-scale data insertions, it is recommended to partition the data and process each partition through separate background jobs. This strategy maximizes throughput by leveraging concurrent writes while preventing write conflicts. The example below demonstrates this using a smaller dataset. The performance advantages become significantly more evident when handling large volumes of historical data.
// Create databases and tables
if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
db1 = database("",VALUE,2024.01.01..2024.01.03)
db2 = database("",HASH,[SYMBOL,4])
db = database("dfs://test", COMPO, [db1,db2],,"TSDB")
tmp = table(1:0, `date`time`sym`val, [DATE,SECOND,SYMBOL,INT])
pt = db.createPartitionedTable(tmp,`pt,`date`sym,,`time,ALL)
// Simulate 100 million records and insert at once
n = 100000000
t = table(take(2024.01.01..2024.01.03,n*3) as date,00:00:00 + 1..(n*3) as time, rand(`A`B`C`D`E`F,n*3) as sym, rand(100,n*3) as val)
timer pt.tableInsert(t)
// Time elapsed: 49329.098 ms
// simulate 100 million records and insert by partition
t1 = table(take(2024.01.01,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
t2 = table(take(2024.01.02,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
t3 = table(take(2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
submitJob("writeData1", "writeData1", tableInsert, pt, t1)
submitJob("writeData2", "writeData2", tableInsert, pt, t2)
submitJob("writeData3", "writeData3", tableInsert, pt, t3)
// Time elapsed returned by getRecentJobs(): around 28000 ms
Appending Out-of-Range Data
Data outside existing partitions can be appended to VALUE or RANGE partitioned databases with configurations and functions. See Add Partitions for details.
// Create a VALUE-partitioned database
if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
db = database("dfs://test", VALUE, 1..5)
tmp = table(1:0,`id`val,[INT,INT])
pt = db.createPartitionedTable(tmp,`pt,`id)
// Append data
pt.tableInsert(table(1..6 as id ,1..6 as val))
// Returns 6, where the partiton 6 already exists
// Create a RANGE-partitioned database with 2 partitions [1,3) and [3,5)
if(existsDatabase("dfs://test1")) dropDatabase("dfs://test1")
db = database("dfs://test1", RANGE, 1 3 5)
tmp = table(1:0,`id`val,[INT,INT])
pt = db.createPartitionedTable(tmp,`pt,`id)
// Append data
pt.tableInsert(table(1..6 as id ,1..6 as val))
// Returns 4, indicating records with id 5 and 6 are discarded
FAQs
Q1. Can data be successfully appended with inconsistent column types?
This depends on the data type compatibility between inserted and target columns, i.e., whether the data types support implicit type conversion. Compatible data types allow appending but may result in precision loss. Incompatible types cause insertion failure.
The following example uses a table t with an id column of INT type. When appending a column of DOUBLE or CHAR into t, the column type will be converted to INT. However, when appending STRING data, the insertion will fail since the STRING type does not support implicit type conversion to INT.
t = table(3:0,[`id],[INT])
t.tableInsert(table(1.2 as id)) // Returns 1
t.tableInsert(table('a' as id)) // Returns 1
t.tableInsert(table("str" as id)) // Error: Failed to append data to column 'id' with error: Incompatible type. Expected: INT, Actual: STRING
Q2. Can data be successfully appended with inconsistent column order?
If you use INSERT INTO
to append data, the data will be appended to
the specified columns.
t = table(1:0,`id`sym,[INT,STRING])
INSERT INTO t (sym,id) VALUES ("AAA",1)
In other cases, the data will be inserted by column order. For example, the n-th column of the inserted data will be appended to the n-th column of the target table, regardless of column names. In this case, whether the insertion would succeed depends on column type compatibility (as described in Q1).
Q3. Can data be successfully appended with inconsistent column count?
If the number of columns to be appended is greater than that of the target table, the append always fails.
t = table(1:0,[`id],[INT])
t.tableInsert(table(1 as id, 2 as val)) // The number of columns of the table to insert must be the same as that of the original table.
If the number of columns to be inserted is smaller than that of the target table:
(1) In-memory tables:
- If the columns are specified in the
INSERT INTO
clause- For partitioned in-memory tables, the partitioning columns must be
specified, otherwise the data is discarded without error
reported.
db = database("",VALUE,1..3) t = db.createPartitionedTable(table(1:0,`id`price`qty,[INT,DOUBLE,INT]),`pt,`id) INSERT INTO t (price,qty) VALUES (3.6,100)
- For non-partitioned in-memory tables, the data insertion will
succeed.
t = table(1:0,`id`price`qty,[INT,DOUBLE,INT]) INSERT INTO t (price,qty) VALUES (3.6,100)
- For partitioned in-memory tables, the partitioning columns must be
specified, otherwise the data is discarded without error
reported.
- If the columns are not specified in the
INSERT INTO
clause, ortableInsert
orappend!
is used, the insertion would fail with an error.t = table(1:0,`id`price`qty,[INT,DOUBLE,INT]) INSERT INTO t VALUES (3.6,100) //The number of table columns doesn't match the number of columns to append. t.append!(table(3.6 as price,100 as qty)) //The number of columns of the table to insert must be the same as that of the original table. t.tableInsert(table(3.6 as price,100 as qty)) //The number of columns of the table to insert must be the same as that of the original table.
(2) DFS tables
- The data to be inserted must include the partitioning columns, otherwise the
insertion would
fail.
if(existsDatabase("dfs://test")) dropDatabase("dfs://test") db = database("dfs://test",VALUE,1..5) t = table(1:0,`sym`id`price`qty,[SYMBOL,INT,DOUBLE,INT]) pt = db.createPartitionedTable(t,`pt,`id) // insert data pt.tableInsert(table("A" as sym)) // Error: The number of columns of the current table must match that of the target table. pt.tableInsert(table("A" as sym, 1 as id)) // data is successfully inserted and the script returns 1
- If n columns have been appended to a partition, m (m<n) columns are no longer
allowed to be appended this partition. Only data with no smaller than n columns
can be
inserted.
if(existsDatabase("dfs://test1")) dropDatabase("dfs://test1") db = database("dfs://test1", VALUE, 1..5) t = table(1:0,`id`sym`price`qty,[INT,SYMBOL,DOUBLE,INT]) pt = db.createPartitionedTable(t,`pt,`id) // insert data pt.tableInsert(table(1 as id, "A" as sym)) // Returns 1, 2 columns are successfully inserted into partition 1 pt.tableInsert(table(1 as id)) // Error: The data to append contains fewer columns than the schema. pt.tableInsert(table(1 as id, "A" as sym, 3.6 as price)) // Returns 1, 3 columns are successfully inserted into partition 1 pt.tableInsert(table(2 as id)) // Returns 1, 1 column is successfully inserted into partition 2
Q4. tableInsert
returns an unexpected number of inserted records.
What could be the causes?
- Out-of-range data are involved and new partitions cannot be automatically generated.
- When writing to a keyed or indexed table, if a key already exists, the write
will update the corresponding value. This update is ignored in result and only
the newly-added records are counted. In the following example, a keyed table kt
contains a record with id 1. Insert records with id 1, 2, and 3, one record is
updated and two are inserted into the
table.
kt = keyedTable(`id,1 as id, 1 as val) kt.tableInsert(table(1..3 as id, 7..9 as val)) // Returns 2