Database

在 Python API 中,可以使用 DolphinDB Python API 的原生方法来创建、使用数据库及数据表。本节将介绍如何创建数据库,以及通过数据库创建数据表。

Database, session.database

Python API 将 DolphinDB 服务端的数据库对象句柄,在 API 包装为 Database 类,封装实现部分功能。通常使用 session.database 方法构造。该方法部分参数可以参考 database

接口如下:

session.database(dbName=None, prititionType=None, parititions=None, dbPath=None, engine=None, atomic=None, chunkGranularity=None)
  • dbName: 数据库句柄名称,创建数据库时可以不指定该参数。
  • partitionType: 分区类型,可选项为 keys.SEQ/keys.VALUE/keys.RANGE/keys.LIST/keys.COMPO/keys.HASH。
  • partitions: 描述如何进行分区,通常为 list 或者 np.ndarray。
  • dbPath: 保存数据库的目录的路径。
  • engine: 数据库存储引擎。
  • atomic: 写入事务的原子性层级。
  • chunkGranularity: 分区粒度,可选值为 "Table"/"DATABASE"。

数据库句柄 dbName

当加载已有数据库或创建数据库时,可以指定该参数,表示将数据库加载到内存后的句柄名称。如果不指定该参数,将会自动生成随机字符串作为句柄名称,可以通过 _getDbName() 方法获取。

例1:创建数据库时不指定 dbName

dbPath = "dfs://dbName"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath)

dbName = db._getDbName()
print(dbName)
print(s.run(dbName))

输出结果如下:

TMP_DB_15c2bf85DB
DB[dfs://dbName]

例2:创建数据库时指定 dbName

dbPath = "dfs://dbName"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(dbName="testDB", partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath)

dbName = db._getDbName()
print(dbName)
print(s.run(dbName))

输出结果如下:

testDB
DB[dfs://dbName]

数据库路径 dbPath 和 分区参数 partitionType/partitions

调用 session.database 创建数据库时,必须指定分区相关参数 partitionType / partitions 。如果创建的数据库为内存数据库,则不需要指定 dbPath;如果创建的数据库为分区数据库,则必须指定 dbPath。

各种分区数据库创建方式

准备环境:

import dolphindb as ddb
import dolphindb.settings as keys
import numpy as np
import pandas as pd

s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")

创建基于 VALUE 分区的数据库

按 date 分区:

dbPath="dfs://db_value_date"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
dates=np.array(pd.date_range(start='20120101', end='20120110'), dtype="datetime64[D]")
db = s.database(dbName='mydb', partitionType=keys.VALUE, partitions=dates,dbPath=dbPath)

按 month 分区:

dbPath="dfs://db_value_month"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
months=np.array(pd.date_range(start='2012-01', end='2012-10', freq="M"), dtype="datetime64[M]")
db = s.database(partitionType=keys.VALUE, partitions=months,dbPath=dbPath)

创建基于 RANGE 分区的数据库

按 INT 类型分区:

dbPath="dfs://db_range_int"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.RANGE, partitions=[1, 11, 21], dbPath=dbPath)

创建基于 LIST 分区的数据库

按 SYMBOL 类型分区:

dbPath="dfs://db_list_sym"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.LIST, partitions=[['IBM', 'ORCL', 'MSFT'], ['GOOG', 'FB']],dbPath=dbPath)

创建基于 HASH 分区的数据库

按 INT 类型分区:

dbPath="dfs://db_hash_int"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.HASH, partitions=[keys.DT_INT, 3], dbPath=dbPath)

创建基于 COMPO 分区的数据库

以下脚本创建基于 COMPO 分区的数据库及数据表:第一层是基于 VALUE 的 date 类型分区,第二层是基于 RANGE 的 int 类型分区。

注意: 创建 COMPO 的子分区数据库的 dbPath 参数必须设置为空字符串或不设置。

db1 = s.database(partitionType=keys.VALUE, partitions=np.array(["2012-01-01", "2012-01-06"], dtype="datetime64[D]"))
db2 = s.database(partitionType=keys.RANGE, partitions=[1, 6, 11])
dbPath="dfs://db_compo_test"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.COMPO, partitions=[db1, db2], dbPath=dbPath)

数据库引擎 engine

默认使用 OLAP 引擎创建数据库,如果希望使用其他引擎创建数据库,可以指定该参数。

创建 TSDB 引擎下的数据库

TSDB 引擎数据库的创建方法和 OLAP 几乎一致,只需要在 database 函数中指定 engine = "TSDB",并在调用建表函数 createTable 和 createPartitionedTable 时指定 sortColumns。

dates = np.array(pd.date_range(start='20120101', end='20120110'), dtype="datetime64[D]")
dbPath = "dfs://tsdb"
if s.existsDatabase(dbPath): 
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=dates, dbPath=dbPath, engine="TSDB")

创建 PKEY 引擎下的数据库

PKEY 引擎(主键引擎)数据库的创建方法和 OLAP 几乎一致,只需要在 database 函数中指定 engine = "PKEY",并在调用建表函数 createTable 和 createPartitionedTable 时指定 primaryKey 参数配置主键列,以及指定 indexes 参数为指定列配置索引方式。

dates = np.array(pd.date_range(start='20120101', end='20120110'), dtype="datetime64[D]")
dbPath = "dfs://pkey"
if s.existsDatabase(dbPath): 
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=dates, dbPath=dbPath, engine="PKEY")

事务原子性层级 atomic

该参数表示写入事务的原子性层级,决定了是否允许并发写入同一分区,可选值为 "TRANS" 和 "CHUNK",默认值为 "TRANS"。

  • 设置为 "TRANS" ,写入事务的原子性层级为事务,即一个事务写入多个分区时,若某个分区被其他写入事务锁定而出现写入冲突,则该事务的写入全部失败。因此,该设置下,不允许并发写入同一个分区。
  • 设置为 "CHUNK" ,写入事务的原子性层级为分区。若一个事务写入多个分区时,某分区被其它写入事务锁定而出现冲突,系统会完成其他分区的写入,同时对之前发生冲突的分区不断尝试写入,尝试数分钟后仍冲突才放弃。此设置下,允许并发写入同一个分区,但由于不能完全保证事务的原子性,可能出现部分分区写入成功而部分分区写入失败的情况。同时由于采用了重试机制,写入速度可能较慢。

分区粒度 chunkGranularity

该参数表示分区粒度,可选值为 "TABLE" 和 "DATABASE"。

  • "Table":表级分区,设置后支持同时写入同一分区的不同表。
  • "DATABASE":数据库级分区,设置后只支持同时写入不同分区。

注意: 指定该参数前,需要构造 session 时设置 enableChunkGranularityConfig=True,否则该参数无效。

createTable

使用 createTable 可以在数据库中创建维度表。其传入参数 table 是一个 Table 对象,该对象将作为生成表的结构参考。

Database.createTable(table, tableName, sortColumns=None)
  • table:Table 类对象,将根据该表的表结构在数据库中创建一个空的维度表。
  • tableName:字符串,表示维度表的名称。
  • sortColumns:可选参数,字符串或字符串列表,用于指定表的排序列。写入的数据将按照 sortColumns 列进行排序。系统默认 sortColumns (指定多列时)排序列的最后一列为时间类型,其余列字段作为排序的索引列,称作 sort key。
  • compressMethods:可选参数,字典,用于指定各列使用的压缩方法,键值分别为列名和压缩方法。如果未指定,则默认是 LZ4.
  • primaryKey:可选参数,字符串或者字符串列表,用于指定主键列。如果指定主键列,相同主键的值将会被新写入的值覆盖。
  • keepDuplicates:可选参数,指定在每个分区内如何处理所有 sortColumns 之值皆相同的数据,提供以下选项:
    • “ALL”:保留所有数据,为默认值。
    • “LAST”:仅保留最新数据。
    • “FIRST”:仅保留第一条数据。
  • softDelete:可选参数,是否启用软删除功能,默认不启用。
  • indexes:可选参数,一个字典,用于为表中的列指定索引。仅当 engine 指定为 “TSDB” 或 “PKEY” 时,本参数才生效。

该方法与 DolphinDB 服务器同名函数使用限制一致,请参阅 createDimensionTable

例1

下面的代码示例将在 TSDB 引擎数据库中创建一张基于 schema_t 表的结构、按 csymbol 列排序的维度表,表名为 pt。

dbPath = "dfs://createTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="TSDB")
s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
schema_t = s.table(data="schema_t")
pt = db.createTable(schema_t, "pt", ["csymbol"])
schema = s.run(f'schema(loadTable("{dbPath}", "pt"))')
print(schema["colDefs"])

输出结果如下:

      name typeString  typeInt  extra comment
0    ctime  TIMESTAMP       12    NaN        
1  csymbol     SYMBOL       17    NaN        
2    price     DOUBLE       16    NaN        
3      qty        INT        4    NaN   

例2

下面的代码示例将在 PKEY 引擎数据库中创建一张基于 schema_t 结构维度表 pt。其中,指定 csymbol 列为主键。本例未指定 indexes 参数,因此 pt 表中,默认只有主键 csymbol 列是 bloomfilter 索引,其它列都是 zonemap 索引。

dbPath = "dfs://createTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="PKEY")
s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
schema_t = s.table(data="schema_t")
pt = db.createTable(schema_t, "pt", primaryKey=["csymbol"])
schema = s.run(f'schema(loadTable("{dbPath}", "pt"))')
print(schema["primaryKey"])
print(schema["indexes"])

输出结果如下:

['csymbol']
                       name         type columnName
0             ctime_zonemap      zonemap      ctime
1  _primary_key_bloomfilter  bloomfilter    csymbol
2           csymbol_zonemap      zonemap    csymbol
3               qty_zonemap      zonemap        qty

例3

下面的代码示例将在 PKEY 引擎数据库中创建一张基于 schema_t 结构的维度表 pt。其中指定主键列为 csymbol 和 qty。本例为 qty 列指定了 bloomfilter 索引。因此 pt 表中,除组合主键外,qty 列也是 bloomfilter 索引,其它列是 zonemap 索引。

dbPath = "dfs://createTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="PKEY")
s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
schema_t = s.table(data="schema_t")
pt = db.createTable(schema_t, "pt", primaryKey=["csymbol", "qty"], indexes={"qty": ["bloomfilter"]})
schema = s.run(f'schema(loadTable("{dbPath}", "pt"))')
print(schema["primaryKey"])
print(schema["indexes"])

输出结果如下:

['csymbol' 'qty']
                       name         type              columnName
0  _primary_key_bloomfilter  bloomfilter  _composite_primary_key
1             ctime_zonemap      zonemap                   ctime
2           csymbol_zonemap      zonemap                 csymbol
3               qty_zonemap      zonemap                     qty
4           qty_bloomfilter  bloomfilter                     qty

createPartitionedTable

使用 createPartitionedTable 可以在分布式数据库中创建一个分区表,且返回一个 Table 表对象。该方法也需要传入 Table 对象作为生成表的结构参考。此外,还需要传入一个字符串或者字符串列表,用于表示分区列。

Database.createPartitionedTable(
    table, tableName, partitionColumns, compressMethods={}, sortColumns=None,
    keepDuplicates=None, sortKeyMappingFunction=None
)
  • table:Table 类对象,将根据该表的表结构在数据库中创建一个空的分区表。
  • tableName:字符串,表示分区表的名称。
  • partitionColumns:字符串或字符串列表,表示分区列。
  • compressMethods:可选参数,字典,用于指定各列使用的压缩方法,键值分别为列名和压缩方法。
  • sortColumns:可选参数,字符串或字符串列表,用于指定表的排序列。写入的数据将按照 sortColumns 列进行排序。系统默认 sortColumns (指定多列时)排序列的最后一列为时间类型,其余列字段作为排序的索引列,称作 sort key。
  • keepDuplicates:可选参数,指定在每个分区内如何处理所有 sortColumns 之值皆相同的数据,提供以下选项:
    • "ALL":保留所有数据,为默认值。
    • "LAST":仅保留最新数据。
    • "FIRST":仅保留第一条数据。
  • sortKeyMappingFunction:可选参数,DolphinDB 服务端函数名字符串列表,其长度与索引列一致,用于指定各索引列使用的排序方法。
  • primaryKey:可选参数,字符串或者字符串列表,用于指定主键列。如果指定了主键列,相同主键的值将会被新写入的值覆盖。
  • softDelete:可选参数,是否启用软删除功能,默认不启用。
  • indexes:可选参数,一个字典,用于为表中的列指定索引。仅当 engine 指定为 “TSDB” 或 “PKEY” 时,本参数才生效。

该方法与 DolphinDB 服务器同名函数使用限制一致,请参阅 createPartitionedTable

例 1

下面的代码示例将在 TSDB 引擎数据库中根据 schema_t 表的结构创建一张分区列为 TradeDate、索引列为 sortColumns 的分区表,并指定排序列为 SecurityID 和 TradeDate,其中 SecurityID 的排序函数使用 hashBucket{,5},每个分区排序列值相同时的处理策略为 "ALL"。

dbPath = "dfs://createPartitionedTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
dates = np.array(pd.date_range(start='20220101', end='20220105'), dtype="datetime64[D]")
db = s.database(partitionType=keys.VALUE, partitions=dates, dbPath=dbPath, engine="TSDB")
s.run("schema_t = table(100:0, `SecurityID`TradeDate`TotalVolumeTrade`TotalValueTrade, [SYMBOL, DATE, INT, DOUBLE])")
schema_t = s.table(data="schema_t")
pt = db.createPartitionedTable(schema_t, "pt", partitionColumns="TradeDate", sortColumns=["SecurityID", "TradeDate"], keepDuplicates="ALL", sortKeyMappingFunction=["hashBucket{,5}"])
schema = s.run(f'schema(loadTable("{dbPath}", "pt"))')
print(schema["colDefs"])

输出结果如下:

               name typeString  typeInt  extra comment
0        SecurityID     SYMBOL       17    NaN        
1         TradeDate       DATE        6    NaN        
2  TotalVolumeTrade        INT        4    NaN        
3   TotalValueTrade     DOUBLE       16    NaN

例 2

下面的代码示例将在 OLAP 引擎数据库中根据 schema_t 的表结构创建一张分区列为 symbol 的分区表,并指定 timestamp 列压缩方式为 delta。

dbPath = "dfs://createPartitionedTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=["IBM", "MS"], dbPath=dbPath)
s.run("schema_t = table(100:0, `timestamp`symbol`value, [TIMESTAMP, SYMBOL, DOUBLE])")
schema_t = s.table(data="schema_t")
pt = db.createPartitionedTable(schema_t, "pt", partitionColumns="symbol", compressMethods={'timestamp': "delta"})
schema = s.run(f'schema(loadTable("{dbPath}", "pt"))')
print(schema["colDefs"])

输出结果如下:

        name typeString  typeInt  extra comment
0  timestamp  TIMESTAMP       12    NaN        
1     symbol     SYMBOL       17    NaN        
2      value     DOUBLE       16    NaN  

例3

下面的代码示例将在 PKEY 引擎数据库中根据 schema_t 结构的分布式分区表 pt。其中指定主键列为 SecurityID、TradeDate ,分区列为 TradeDate,SecurityID 主键的索引方式为 bloomfilter。

dbPath = "dfs://createPartitionedTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
dates = np.array(pd.date_range(start='20220101', end='20220105'), dtype="datetime64[D]")
db = s.database(partitionType=keys.VALUE, partitions=dates, dbPath=dbPath, engine="PKEY")
s.run("schema_t = table(100:0, `SecurityID`TradeDate`TotalVolumeTrade`TotalValueTrade, [SYMBOL, DATE, INT, DOUBLE])")
schema_t = s.table(data="schema_t")
pt = db.createPartitionedTable(schema_t, "pt", partitionColumns="TradeDate", primaryKey=["SecurityID", "TradeDate"], indexes={'SecurityID': ["bloomfilter"]})
schema = s.run(f'schema(loadTable("{dbPath}", "pt"))')
print(schema["primaryKey"])
print(schema["indexes"])

输出结果如下:

['SecurityID' 'TradeDate']
                       name         type              columnName
0  _primary_key_bloomfilter  bloomfilter  _composite_primary_key
1        SecurityID_zonemap      zonemap              SecurityID
2    SecurityID_bloomfilter  bloomfilter              SecurityID
3  TotalVolumeTrade_zonemap      zonemap        TotalVolumeTrade