Database
在 Python API 中,可以使用 DolphinDB Python API 的原生方法来创建、使用数据库及数据表。本节将介绍如何创建数据库,以及通过数据库创建数据表。
Database, session.database
Python API 将 DolphinDB 服务端的数据库对象句柄,在 API 包装为 Database 类,封装实现部分功能。通常使用 session.database 方法构造。该方法部分参数可以参考 database。
接口如下:
session.database(dbName=None, prititionType=None, parititions=None, dbPath=None, engine=None, atomic=None, chunkGranularity=None)
- dbName: 数据库句柄名称,创建数据库时可以不指定该参数。
- partitionType: 分区类型,可选项为 keys.SEQ/keys.VALUE/keys.RANGE/keys.LIST/keys.COMPO/keys.HASH。
- partitions: 描述如何进行分区,通常为 list 或者 np.ndarray。
- dbPath: 保存数据库的目录的路径。
- engine: 数据库存储引擎。
- atomic: 写入事务的原子性层级。
- chunkGranularity: 分区粒度,可选值为 "Table"/"DATABASE"。
数据库句柄 dbName
当加载已有数据库或创建数据库时,可以指定该参数,表示将数据库加载到内存后的句柄名称。如果不指定该参数,将会自动生成随机字符串作为句柄名称,可以通过 _getDbName()
方法获取。
例1:创建数据库时不指定 dbName
dbPath = "dfs://dbName" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath) dbName = db._getDbName() print(dbName) print(s.run(dbName))
输出结果如下:
TMP_DB_15c2bf85DB
DB[dfs://dbName]
例2:创建数据库时指定 dbName
dbPath = "dfs://dbName" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(dbName="testDB", partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath) dbName = db._getDbName() print(dbName) print(s.run(dbName))
输出结果如下:
testDB
DB[dfs://dbName]
数据库路径 dbPath 和 分区参数 partitionType/partitions
调用 session.database 创建数据库时,必须指定分区相关参数 partitionType / partitions 。如果创建的数据库为内存数据库,则不需要指定 dbPath;如果创建的数据库为分区数据库,则必须指定 dbPath。
各种分区数据库创建方式
准备环境:
import dolphindb as ddb import dolphindb.settings as keys import numpy as np import pandas as pd s = ddb.session() s.connect("localhost", 8848, "admin", "123456")
创建基于 VALUE 分区的数据库
按 date 分区:
dbPath="dfs://db_value_date" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) dates=np.array(pd.date_range(start='20120101', end='20120110'), dtype="datetime64[D]") db = s.database(dbName='mydb', partitionType=keys.VALUE, partitions=dates,dbPath=dbPath)
按 month 分区:
dbPath="dfs://db_value_month" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) months=np.array(pd.date_range(start='2012-01', end='2012-10', freq="M"), dtype="datetime64[M]") db = s.database(partitionType=keys.VALUE, partitions=months,dbPath=dbPath)
创建基于 RANGE 分区的数据库
按 INT 类型分区:
dbPath="dfs://db_range_int" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(partitionType=keys.RANGE, partitions=[1, 11, 21], dbPath=dbPath)
创建基于 LIST 分区的数据库
按 SYMBOL 类型分区:
dbPath="dfs://db_list_sym" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(partitionType=keys.LIST, partitions=[['IBM', 'ORCL', 'MSFT'], ['GOOG', 'FB']],dbPath=dbPath)
创建基于 HASH 分区的数据库
按 INT 类型分区:
dbPath="dfs://db_hash_int" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(partitionType=keys.HASH, partitions=[keys.DT_INT, 3], dbPath=dbPath)
创建基于 COMPO 分区的数据库
以下脚本创建基于 COMPO 分区的数据库及数据表:第一层是基于 VALUE 的 date 类型分区,第二层是基于 RANGE 的 int 类型分区。
注意: 创建 COMPO 的子分区数据库的 dbPath 参数必须设置为空字符串或不设置。
db1 = s.database(partitionType=keys.VALUE, partitions=np.array(["2012-01-01", "2012-01-06"], dtype="datetime64[D]")) db2 = s.database(partitionType=keys.RANGE, partitions=[1, 6, 11]) dbPath="dfs://db_compo_test" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(partitionType=keys.COMPO, partitions=[db1, db2], dbPath=dbPath)
数据库引擎 engine
默认使用 OLAP 引擎创建数据库,如果希望使用其他引擎创建数据库,可以指定该参数。
创建 TSDB 引擎下的数据库
TSDB 引擎数据库的创建方法和 OLAP 几乎一致,只需要在 database 函数中指定 engine = "TSDB"
,并在调用建表函数 createTable 和 createPartitionedTable 时指定 sortColumns。
dates = np.array(pd.date_range(start='20120101', end='20120110'), dtype="datetime64[D]") dbPath = "dfs://tsdb" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(partitionType=keys.VALUE, partitions=dates, dbPath=dbPath, engine="TSDB")
事务原子性层级 atomic
该参数表示写入事务的原子性层级,决定了是否允许并发写入同一分区,可选值为 "TRANS" 和 "CHUNK",默认值为 "TRANS"。
- 设置为 "TRANS" ,写入事务的原子性层级为事务,即一个事务写入多个分区时,若某个分区被其他写入事务锁定而出现写入冲突,则该事务的写入全部失败。因此,该设置下,不允许并发写入同一个分区。
- 设置为 "CHUNK" ,写入事务的原子性层级为分区。若一个事务写入多个分区时,某分区被其它写入事务锁定而出现冲突,系统会完成其他分区的写入,同时对之前发生冲突的分区不断尝试写入,尝试数分钟后仍冲突才放弃。此设置下,允许并发写入同一个分区,但由于不能完全保证事务的原子性,可能出现部分分区写入成功而部分分区写入失败的情况。同时由于采用了重试机制,写入速度可能较慢。
分区粒度 chunkGranularity
该参数表示分区粒度,可选值为 "TABLE" 和 "DATABASE"。
- "Table":表级分区,设置后支持同时写入同一分区的不同表。
- "DATABASE":数据库级分区,设置后只支持同时写入不同分区。
注意: 指定该参数前,需要构造 session 时设置 enableChunkGranularityConfig=True
,否则该参数无效。
createTable
使用 createTable 可以在数据库中创建维度表。其传入参数 table 是一个 Table 对象,该对象将作为生成表的结构参考。
Database.createTable(table, tableName, sortColumns=None)
- table:Table 类对象,将根据该表的表结构在数据库中创建一个空的维度表。
- tableName:字符串,表示维度表的名称。
- sortColumns:字符串或字符串列表,用于指定表的排序列。写入的数据将按照 sortColumns 列进行排序。系统默认 sortColumns (指定多列时)排序列的最后一列为时间类型,其余列字段作为排序的索引列,称作 sort key。
该方法与 DolphinDB 服务器同名函数使用限制一致,请参阅 createTable。
下面的代码示例将在 TSDB 引擎数据库中创建一张基于 schema_t 表的结构、按 csymbol 列排序的维度表,表名为 pt。
dbPath = "dfs://createTable" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="TSDB") s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])") schema_t = s.table(data="schema_t") pt = db.createTable(schema_t, "pt", ["csymbol"]) schema = s.run(f'schema(loadTable("{dbPath}", "pt"))') print(schema["colDefs"])
输出结果如下:
name typeString typeInt extra comment
0 ctime TIMESTAMP 12 NaN
1 csymbol SYMBOL 17 NaN
2 price DOUBLE 16 NaN
3 qty INT 4 NaN
createPartitionedTable
使用 createPartitionedTable 可以在分布式数据库中创建一个分区表,且返回一个 Table 表对象。该方法也需要传入 Table 对象作为生成表的结构参考。此外,还需要传入一个字符串或者字符串列表,用于表示分区列。
Database.createPartitionedTable( table, tableName, partitionColumns, compressMethods={}, sortColumns=None, keepDuplicates=None, sortKeyMappingFunction=None )
- table:Table 类对象,将根据该表的表结构在数据库中创建一个空的分区表。
- tableName:字符串,表示分区表的名称。
- partitionColumns:字符串或字符串列表,表示分区列。
- compressMethods:字典,用于指定各列使用的压缩方法,键值分别为列名和压缩方法。
- sortColumns:字符串或字符串列表,用于指定表的排序列。写入的数据将按照 sortColumns 列进行排序。系统默认 sortColumns (指定多列时)排序列的最后一列为时间类型,其余列字段作为排序的索引列,称作 sort key。
- keepDuplicates:指定在每个分区内如何处理所有 sortColumns 之值皆相同的数据,提供以下选项:
- "ALL":保留所有数据,为默认值。
- "LAST":仅保留最新数据。
- "FIRST":仅保留第一条数据。
- sortKeyMappingFunction:DolphinDB 服务端函数名字符串列表,其长度与索引列一致,用于指定各索引列使用的排序方法。
该方法与 DolphinDB 服务器同名函数使用限制一致,请参阅 createPartitionedTable。
例 1
下面的代码示例将在 TSDB 引擎数据库中根据 schema_t 表的结构创建一张分区列为 TradeDate、索引列为 sortColumns 的分区表,并指定排序列为 SecurityID 和 TradeDate,其中 SecurityID 的排序函数使用 hashBucket{,5},每个分区排序列值相同时的处理策略为 "ALL"。
if s.existsDatabase(dbPath): s.dropDatabase(dbPath) dates = np.array(pd.date_range(start='20220101', end='20220105'), dtype="datetime64[D]") db = s.database(partitionType=keys.VALUE, partitions=dates, dbPath=dbPath, engine="TSDB") s.run("schema_t = table(100:0, `SecurityID`TradeDate`TotalVolumeTrade`TotalValueTrade, [SYMBOL, DATE, INT, DOUBLE])") schema_t = s.table(data="schema_t") pt = db.createPartitionedTable(schema_t, "pt", partitionColumns="TradeDate", sortColumns=["SecurityID", "TradeDate"], keepDuplicates="ALL", sortKeyMappingFunction=["hashBucket{,5}"]) schema = s.run(f'schema(loadTable("{dbPath}", "pt"))') print(schema["colDefs"])
输出结果如下:
name typeString typeInt extra comment
0 SecurityID SYMBOL 17 NaN
1 TradeDate DATE 6 NaN
2 TotalVolumeTrade INT 4 NaN
3 TotalValueTrade DOUBLE 16 NaN
例 2
下面的代码示例将在 OLAP 引擎数据库中根据 schema_t 的表结构创建一张分区列为 symbol 的分区表,并指定 timestamp 列压缩方式为 delta。
dbPath = "dfs://createPartitionedTable" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(partitionType=keys.VALUE, partitions=["IBM", "MS"], dbPath=dbPath) s.run("schema_t = table(100:0, `timestamp`symbol`value, [TIMESTAMP, SYMBOL, DOUBLE])") schema_t = s.table(data="schema_t") pt = db.createPartitionedTable(schema_t, "pt", partitionColumns="symbol", compressMethods={'timestamp': "delta"}) schema = s.run(f'schema(loadTable("{dbPath}", "pt"))') print(schema["colDefs"])
输出结果如下:
name typeString typeInt extra comment
0 timestamp TIMESTAMP 12 NaN
1 symbol SYMBOL 17 NaN
2 value DOUBLE 16 NaN