插入数据

插入数据到内存表

SQL 语句

通过 INSERT INTO 语句中使用 VALUES 子句插入数据。这种方式仅适用于内存表,无法向分布式表种追加数据。

如果只想插入表的某几列,可以在表名后的括号中指定列名,VALUES 关键字后的括号中按照列的顺序提供相应的值。

// 插入指定列 col1,col2,col3...
INSERT INTO <tableName> (col1, col2, col3, ...) VALUES (val1, val2, val3, ...);
注: 如果插入的是分区内存表,则指定的列中必须包含分区列。

如果不指定列,默认插入所有列。此时在 VALUES 关键字后的括号中按照表的列顺序提供相应的值。

// 插入所有列
INSERT INTO <tableName> VALUES (val1, val2, val3, ...);

例如,下例中展示向一张空表中插入 2 行完整数据,再插入一行只包含 2 列的数据,其余列值为空。

t = table(1:0,`id`sym`val,[INT,SYMBOL,DOUBLE])
INSERT INTO t VALUES ([1,2],["A","B"],[2.3,3.6])
INSERT INTO t (id,val) VALUES (3,7.6) 

append!

append! 可用于向表中插入数据,此时插入的数据必须以表的形式组织。此方法既适用于内存表,也适用于分布式表。

t = table(1:0,`id`sym`val,[INT,SYMBOL,DOUBLE])
tmp = table(1..10 as id, take(`A`B,10) as sym, rand(10.0,10) as val)
append!(t,tmp)
重要: 使用 append! 插入数据前,应确保待插入数据表和目标表结构一致
  • 插入时是将待插入数据表的每一列按照顺序依次插入,数据表的列数必须与目标表一致,列数不一致会导致插入失败。
  • 插入数据时,不会根据数据表的列名与目标表的列匹配后插入,而是依次插入各列,因此列的顺序需要与目标表一致,而列名则不做严格要求。
  • 当列的数据类型与目标表不一致时,能够进行隐式类型转换的列会转换为目标类型后插入,同时可能会造成数据精度的损失;如遇到不能隐式转换的类型,则本次插入失败。

tableInsert

tableInsert 可用于向表中插入数据,此时插入的数据必须以表的形式组织。此方法既适用于内存表,也适用于分布式表。

不同于 append! 的是,tableInsert 会返回此次插入成功的条数

例如,下例中向一个分区内存表中插入 10 条数据,此分区内存表只有 5 个分区,在这 5 个分区内的数据成功插入,其余数据被舍去。

db = database("", VALUE, 1..5)
schemaTb = table(1:0, `id`sym`val, [INT,SYMBOL,DOUBLE])
t = db.createPartitionedTable(schemaTb, `mpt, `id)

tmp = table(1..10 as id, take(`A`B,10) as sym, rand(10.0,10) as val)
tableInsert(t, tmp)

插入数据到分布式表

插入数据到分布式表,只能使用 append!tableInsert 方法。使用方法上与内存表相同:

  • 对于新建立的分布式表,可以通过 createPartitionedTable 函数的返回获取表对象

    db = database("dfs://valuedb", VALUE, 2023.01.01..2023.12.31)
    schemaTb = table(1:0,`date`time`sym`price,[DATE,TIME,SYMBOL,DOUBLE])
    t = db.createPartitionedTable(schemaTb,`pt,`date)
    
    tmp = table(take(today(),2) as date, take(time(now()),2) as time, ["A","B"] as sym, [6.2,6.9] as price)
    tableInsert(t,tmp)
    
  • 对于此前建立的分布式表,可以通过 loadTable 获取表对象

    t = loadTable("dfs://valuedb","pt")
    
    tmp = table(take(today(),2) as date, take(time(now()),2) as time, ["A","B"] as sym, [6.2,6.9] as price)
    tableInsert(t,tmp)
    

常见问题

以下各节对不同场景下插入数据是否能够成功及其他插入数据时的常见问题做出解答。

待插入列与目标列数据类型不匹配

当列的数据类型与目标列的类型不一致时,如果能够进行隐式类型转换会转换为目标类型后插入,但这可能会造成数据精度的损失;如遇到不能隐式转换的类型,则本次插入失败。

例如存在一个表 t,其中列 id 为 INT 类型,当向其写入 DOUBLE、CHAR 类型的数据时,会进行数据类型转换后写入;而向其写入 STRING 类型的数据,会因为无法进行类型转换导致插入失败。

t = table(3:0,[`id],[INT])
t.tableInsert(table(1.2 as id)) // 返回1
t.tableInsert(table('a' as id)) // 返回1
t.tableInsert(table("str" as id)) // Failed to append data to column 'id' with error: Incompatible type. Expected: INT, Actual: STRING

待插入列与目标表的列顺序不一致

如果采用 INSERT INTO 指定插入的列,则数据会按照列名,插入对应的列。

t = table(1:0,`id`sym,[INT,STRING])
INSERT INTO t (sym,id) VALUES ("AAA",1)

其他情况下,会按照顺序依次写入,即尝试将待插入数据的第 n 列插入到目标表的第 n 列,与列名无关。此时如果存在数据类型不匹配的问题,插入能否成功取决于数据类型能否进行转换,详情参考上个问题。

待插入列与目标表的列数不一致

当插入的列数大于目标表时,插入失败。

t = table(1:0,[`id],[INT])
t.tableInsert(table(1 as id, 2 as val)) // The number of columns of the table to insert must be the same as that of the original table.

当插入的列数小于目标表时

  • 插入内存表

    • 使用 INSERT INTO 语句指定插入的列

      • 如果插入分区内存表,则必须包含分区列;否则无法插入,且系统不会报错并丢弃对应数据。

        db = database("",VALUE,1..3)
        t = db.createPartitionedTable(table(1:0,`id`price`qty,[INT,DOUBLE,INT]),`pt,`id)
        INSERT INTO t (price,qty) VALUES (3.6,100)
      • 插入其它类型内存表,或者插入分区表(数据包含分区列),则插入成功。

      • t = table(1:0,`id`price`qty,[INT,DOUBLE,INT]) INSERT INTO t (price,qty) VALUES (3.6,100)
    • 使用 INSERT INTO 语句未指定插入的列,或使用 tableInsertappend! 插入,则插入失败,此时系统会报错。

      t = table(1:0,`id`price`qty,[INT,DOUBLE,INT])
      INSERT INTO t VALUES (3.6,100) //The number of table columns doesn't match the number of columns to append.
      t.append!(table(3.6 as price,100 as qty)) //The number of columns of the table to insert must be the same as that of the original table.
      t.tableInsert(table(3.6 as price,100 as qty)) //The number of columns of the table to insert must be the same as that of the original table.
  • 插入分布式表

    • 插入分区表的数据必须包含分区列,否则插入失败。

      // 建库建表
      if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
      db = database("dfs://test",VALUE,1..5)
      t = table(1:0,`sym`id`price`qty,[SYMBOL,INT,DOUBLE,INT])
      pt = db.createPartitionedTable(t,`pt,`id)
      ​
      // 插入数据
      pt.tableInsert(table("A" as sym)) //不包含分区列,失败报错 The number of columns of the current table must match that of the target table.
      pt.tableInsert(table("A" as sym, 1 as id)) // 包含分区列,返回1,插入成功。
    • 如果某一个分区内成功插入过 n 列数据,则不允许再向该分区插入 m(m<n)列数据,可以继续插入大于等于 n 列的数据。

      // 创建库表
      if(existsDatabase("dfs://test1")) dropDatabase("dfs://test1")
      db = database("dfs://test1", VALUE, 1..5) 
      t = table(1:0,`id`sym`price`qty,[INT,SYMBOL,DOUBLE,INT])
      pt = db.createPartitionedTable(t,`pt,`id)
      ​
      //插入数据
      pt.tableInsert(table(1 as id, "A" as sym)) // 返回1,成功向分区1中插入包含2列的数据
      pt.tableInsert(table(1 as id)) // 无法向分区1中插入包含1列的数据,报错 The data to append contains fewer columns than the schema.
      pt.tableInsert(table(1 as id, "A" as sym, 3.6 as price)) // 返回1,成功向分区1中插入包含3列的数据
      pt.tableInsert(table(2 as id)) // 返回1,成功向分区2中插入包含1列的数据

插入数据包含现有分区之外的数据

对于 VALUE 分区或含有 COMPO 分区的 VALUE 层级,可通过设置配置参数 newValuePartitionPolicy=add,实现自动创建对应分区并写入数据。

单节点的配置文件为 dolphindb.cfg,普通集群的配置文件为 cluster.cfg,高可用集群的配置项须通过 Web 界面修改。

对于采用其他分区方式的数据库,分区之外的数据都不会插入。

对于 RANGE 分区,可以提前通过addRangePartitions 函数扩展分区方案,但只能向后增加分区,不能向前增加。详情可参考增加分区

//创建 VALUE 分区数据库
if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
db = database("dfs://test", VALUE, 1..5)
tmp = table(1:0,`id`val,[INT,INT])
pt = db.createPartitionedTable(tmp,`pt,`id)
// 插入数据
pt.tableInsert(table(1..6 as id ,1..6 as val)) // 返回6,其中分区6为写入时创建


// 创建 RANGE 分区数据库
if(existsDatabase("dfs://test1")) dropDatabase("dfs://test1")
db = database("dfs://test1", RANGE, 1 3 5) //由于 RANGE 的区间范围是左闭右开,第一个分区为[1,3) ,第二个分区为[3,5)
tmp = table(1:0,`id`val,[INT,INT])
pt = db.createPartitionedTable(tmp,`pt,`id)
// 插入数据
pt.tableInsert(table(1..6 as id ,1..6 as val)) // 返回4,id值为5,6的两条数据不在两个分区的范围内,故被舍去没有插入。

tableInsert 正常运行但插入成功的条数与预期不符的原因

  • 分区表写入分区范围外的数据,且分区不满足自动创建的条件,此时分区之外的数据没有写入。详情参考上个问题。

  • 当写入键值表或者索引表时,如果要写入的数据在数据表中已经存在对应键值,则会更新该键值对应的数据,但写入成功的统计结果不会包含此条更新。因为写入成功的条数只包括新增的数据条数。 下例中,一个键值表 kt 中,存在键值为 1 的数据,此时插入三条键值为 1、2、3 的数据,此时会更新键值为 1 的数据,并插入键值为 2、3 的数据,而 tableInsert 返回 2,是指新插入了键值为 2、3 的数据。

kt = keyedTable(`id,1 as id, 1 as val)
kt.tableInsert(table(1..3 as id, 7..9 as val)) // 返回2

向分布式表插入数据时对数据类型的要求

  • 插入的数据包含 STRING 类型的数据时,每个 STRING 不能超过 64KB,超过 64KB 的部分会被截断,不会入库。

  • 插入的数据包含 BLOB 类型的数据时,每个 BLOB 不能超过 64MB,超过 64MB 的部分会被截断,不会入库。

  • 插入的数据包含 SYMBOL 类型的数据时,每个 SYMBOL 不能超过 255B,超出时会抛出异常,插入失败。

  • 某个表字段定义为 SYMBOL 类型时,必须保证该字段在同一分区的不同取值小于 2097152 个,如果本次插入导致不同取值个数超过此限制,则插入失败并报错 One symbase's size can't exceed 2097152.

  • 如果数据库为 VALUE 分区,且分区列为 STRING 类型或 SYMBOL 类型:

    • 插入的分区列数据不能包含空格、"\n"、"\r" 和 "\t",否则会插入失败并报错 A STRING or SYMBOL column used for value-partitioning cannot contain invisible characters.

    • 对于 "." 和 ":" 字符,在创建分区路径时会被忽略,例如 a:bc 在分区路径中会被写为 abc。值得注意的是,有 n 个分区的分区路径相同,向这些分区中任一分区写入 m 条数据,查询时这 m 条数据中,每条都查询到 n 条相同结果,如下例所示。若要避免此情况,只需在配置文件中设置配置项 ignoreSpecialCharacterInPartitionId=false.

//创建库表,初始分区为 abc
if(existsDatabase("dfs://my_test")) dropDatabase("dfs://my_test")
db = database("dfs://my_test",VALUE,["abc"])
schemaTb = table(1:0,`name`val,[STRING,INT])
pt = db.createPartitionedTable(schemaTb,`pt,`name)

// 写入数据,分区列值为?abc ,分区路径按照上述规则应为 abc
t = table("a:bc" as name, 100 as val)
pt.tableInsert(t) //返回1,成功写入一条数据

select * from pt //查询返回两条数据
name val
a:bc 100
a:bc 100
注: 对于 "/"、"\"、"<"、">"、"*" 和 "?" 字符,如果它们出现在字符串的开头,在创建分区路径时会被忽略;否则,将被替换为连接符 "-"。例如 //a\\b*c<> 在分区路径中会被写为 -a-b-c--

插入数据的存储顺序及写入性能

数据在数据库中按照分区存储。其每个分区内部的顺序与选择的存储引擎有关:

  • 对于采用 OLAP 存储引擎的数据库,分区内数据的顺序与插入的顺序相同。

  • 对于采用 TSDB 存储引擎的数据库,在分区内会根据 sortColumns 进行排序。

插入有序和无序数据对写入性能几乎没有影响。

插入数据到多个分区时的事务支持性

这取决于数据库事务的原子性层级。在创建数据库时通过指定参数 atomic 指定事务的原子性层级:

  • 当此数据库事务的原子性层级为 TRANS,则支持事务,即若某个分区写入失败,例如被其他写入事务锁定而出现写入冲突,则本次写入事务全部失败。

    // 创建一个原子性层级为 TRANS 的数据库
    if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
    db1 = database("",VALUE,2024.01.01..2024.01.03)
    db2 = database("",HASH,[SYMBOL,4])
    db = database("dfs://test", COMPO, [db1,db2],,"TSDB","TRANS")
    tmp = table(1:0, `date`time`sym`val, [DATE,SECOND,SYMBOL,INT])
    pt = db.createPartitionedTable(tmp,`pt,`date`sym,,`time,ALL)
    
    // 后台提交任务模拟并发写入场景
    n =  100
    t1 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    t2 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    t3 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    submitJob("writeDate1", "writeDate1", tableInsert, pt, t1)
    submitJob("writeDate2", "writeDate2", tableInsert, pt, t2)
    submitJob("writeDate3", "writeDate3", tableInsert, pt, t3)
    
    select count(*) from pt 
    count
    100

    通过 getRecentJobs() 方法可以看到,有两个写入任务因为涉及的分区被其他事务锁定导致写入失败:The openChunks operation failed because the chunk '/test/20240101/Key1/OQ' is currently locked and in use by transaction 3153.

  • 当此数据库事务的原子性层级为 CHUNK,若某分区被其它写入事务锁定而出现冲突,系统会完成其他分区的写入,同时对之前发生冲突的分区不断尝试写入,尝试数分钟后仍冲突才放弃写入。此设置下,允许并发写入同一个分区,但由于不能完全保证事务的原子性,可能出现部分分区写入成功而部分分区写入失败的情况。同时由于采用了重试机制,写入速度可能较慢。

    if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
    db1 = database("",VALUE,2024.01.01..2024.01.03)
    db2 = database("",HASH,[SYMBOL,4])
    db = database("dfs://test", COMPO, [db1,db2],,"TSDB","CHUNK")
    tmp = table(1:0, `date`time`sym`val, [DATE,SECOND,SYMBOL,INT])
    pt = db.createPartitionedTable(tmp,`pt,`date`sym,,`time,ALL)
    
    n =  100
    t1 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    t2 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    t3 = table(take(2024.01.01..2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
    
    
    submitJob("writeDate1", "writeDate1", tableInsert, pt, t1)
    submitJob("writeDate2", "writeDate2", tableInsert, pt, t2)
    submitJob("writeDate3", "writeDate3", tableInsert, pt, t3)
    
    select count(*) from pt
    count
    300

向一张表内插入大量数据的最佳方案

在已知分区结构的情况下,最好的方式是将数据按照分区划分后分别提交后台作业,如此既可以充分利用多线程并发写入,提高写入效率,又可以确保不会因为写入冲突而造成写入失败。

下例简单模拟了少量数据进行性能对比,当处理大批量的历史数据时,性能差距更加明显。

// 建库建表
if(existsDatabase("dfs://test")) dropDatabase("dfs://test")
db1 = database("",VALUE,2024.01.01..2024.01.03)
db2 = database("",HASH,[SYMBOL,4])
db = database("dfs://test", COMPO, [db1,db2],,"TSDB")
tmp = table(1:0, `date`time`sym`val, [DATE,SECOND,SYMBOL,INT])
pt = db.createPartitionedTable(tmp,`pt,`date`sym,,`time,ALL)

// 模拟一亿条数据一次性写入
n = 100000000
t = table(take(2024.01.01..2024.01.03,n*3) as date,00:00:00 + 1..(n*3) as time, rand(`A`B`C`D`E`F,n*3) as sym, rand(100,n*3) as val)
timer pt.tableInsert(t)
// Time elapsed: 49329.098 ms

// 模拟一亿条数据,按照分区划分,分别提交三个后台作业
t1 = table(take(2024.01.01,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
t2 = table(take(2024.01.02,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
t3 = table(take(2024.01.03,n) as date, 00:00:00 + 1..n as time, rand(`A`B`C`D`E`F,n) as sym, rand(100,n) as val)
submitJob("writeDate1", "writeDate1", tableInsert, pt, t1)
submitJob("writeDate2", "writeDate2", tableInsert, pt, t2)
submitJob("writeDate3", "writeDate3", tableInsert, pt, t3)
// 通过 getRecentJobs() 查看耗时约为28秒。