Table
在 Python API 中,可以使用 DolphinDB Python API 的原生方法来创建、使用数据库及数据表,本节将介绍如何创建数据表、使用 SQL 操作数据表。
Table 与 session.table
Python API 将 DolphinDB 服务端的数据表对象句柄,在 API 包装为 Table 类,封装实现部分功能。通常使用 session.table 或 session.loadTable 方法构造,也可以通过 loadText 等函数获得。
接口如下:
session.table(dbPath=None, data=None, tableAliasName=None, inMem=False, partitions=None)
- dbPath:数据库路径,内存表或流数据表无需指定该参数。
- data:数据表的数据,可以为 dict、pd.DataFrame 或 DolphinDB 服务端数据表名。
- tableAliasName:表的别名。
- inMem:是否加载表数据到 DolphinDB 服务端内存中。
- partitions:将被加载到 DolphinDB 服务端内存中的分区。
上传数据为临时表
如果 data 参数传入 dict 或 pd.DataFrame,则表示上传该对象到 DolphinDB 服务端为临时表,此时无需指定 dbPath、inMem、partitions。
代码示例如下:
data1 = pd.DataFrame({ 'a': [1, 2, 3], 'b': [4, 5, 6], }) t1 = s.table(data=data1) print(t1, t1.tableName()) data2 = { 'a': ['a', 'b', 'c'], 'b': [1, 2, 3], } t2 = s.table(data=data2) print(t2, t2.tableName())
输出结果如下:
<dolphindb.table.Table object at 0x7fbd5f02bd60> TMP_TBL_3cc57246
<dolphindb.table.Table object at 0x7fbd3205fc70> TMP_TBL_dbae4978
data1 和 data2 都作为临时表上传至服务端,对应表名分别为 TMP_TBL_3cc57246 和 TMP_TBL_dbae4978。
获取服务端数据表句柄
data 参数传入字符串,则表示获取服务端数据表句柄。
如果同时指定 dbPath 和 data,执行函数则表示从 dbPath 对应的数据库中加载表名为 data 的数据表。
dbPath = "dfs://testTable" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="TSDB") s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])") schema_t = s.table(data="schema_t") db.createTable(schema_t, "pt", ["csymbol"]) pt = s.table(dbPath=dbPath, data="pt") print(pt, pt.tableName()) print(pt.toDF())
输出结果如下:
<dolphindb.table.Table object at 0x7f5036bcd040> pt_TMP_TBL_5229a3cc Empty DataFrame Columns: [ctime, csymbol, price, qty] Index: []
如果仅指定 data,执行函数则表示获取名为 data 的内存表句柄。
s.run("test_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])") t = s.table(data="test_t") print(t, t.tableName()) print(t.toDF())
输出结果如下:
<dolphindb.table.Table object at 0x7f11ffb3c070> test_t Empty DataFrame Columns: [ctime, csymbol, price, qty] Index: []
tableAliasName
如果指定该参数,则加载表时不会使用随机表名作为句柄名称。
上传本地变量至服务端时指定该参数。
data1 = pd.DataFrame({ 'a': [1, 2, 3], 'b': [4, 5, 6], }) t1 = s.table(data=data1, tableAliasName="data1") print(t1, t1.tableName()) data2 = { 'a': ['a', 'b', 'c'], 'b': [1, 2, 3], } t2 = s.table(data=data2, tableAliasName="data2") print(t2, t2.tableName())
输出结果如下:
<dolphindb.table.Table object at 0x7f167ecb69d0> data1 <dolphindb.table.Table object at 0x7f1651d0bc40> data2
获取 DolphinDB 服务端数据库中数据表时指定该参数。
dbPath = "dfs://testTable" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="TSDB") s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])") schema_t = s.table(data="schema_t") db.createTable(schema_t, "pt", ["csymbol"]) pt = s.table(dbPath=dbPath, data="pt", tableAliasName="tmp_pt") print(pt, pt.tableName()) print(pt.toDF())
输出结果如下:
<dolphindb.table.Table object at 0x7f3350edc040> tmp_pt Empty DataFrame Columns: [ctime, csymbol, price, qty] Index: []
获取 DolphinDB 服务端内存表句柄时指定该参数。
s.run("test_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])") t = s.table(data="test_t", tableAliasName="test_t2") print(t, t.tableName()) print(t.toDF())
输出结果如下:
<dolphindb.table.Table object at 0x7f9fb55b4070> test_t Empty DataFrame Columns: [ctime, csymbol, price, qty] Index: []
从上述例子中可以看出,如果加载分区表或上传本地数据时,可以通过指定别名来避免使用临时表名;如果直接使用表名加载数据表句柄,则该参数无效。
inMem, partitions
该参数仅在加载磁盘数据库中的数据表时有效,参数详细使用方式请参考 loadTable。
session.loadTable
该函数和 session.table 方法类似,返回值为 Table,但无法上传本地数据,仅能获取 DolphinDB 服务端数据表句柄。
session.loadTable(tableName, dbPath=None, partitions=None, memoryMode=False)
- tableName:内存表名或数据库中数据表表名。
- dbPath:数据库路径。
- partitions:将被加载到 DolphinDB 服务端内存中的分区。
- memoryMode:是否加载表数据到 DolphinDB 服务端内存中。
加载内存表
代码示例如下:
s.run("test_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])") t = s.loadTable("test_t") print(t, t.tableName()) print(t.toDF())
输出结果如下:
<dolphindb.table.Table object at 0x7fd1c90a4c10> test_t
Empty DataFrame
Columns: [ctime, csymbol, price, qty]
Index: []
加载数据库表
代码示例如下:
dbPath = "dfs://testTable" if s.existsDatabase(dbPath): s.dropDatabase(dbPath) db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="TSDB") s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])") schema_t = s.table(data="schema_t") db.createTable(schema_t, "pt", ["csymbol"]) pt = s.loadTable("pt", dbPath=dbPath) print(pt, pt.tableName()) print(pt.toDF())
输出结果如下:
<dolphindb.table.Table object at 0x7fdaf7885eb0> pt_TMP_TBL_0dfdc80a
Empty DataFrame
Columns: [ctime, csymbol, price, qty]
Index: []
上传的数据表的生命周期
table
和 loadTable
方法返回一个 Python 本地变量,如果上传一个本地数据对象到服务端,且未指定别名,则使用随机表名作为该变量的句柄名。下例中,上传本地 data 对象到服务端,对应的 Python 本地变量为 t。服务端表名可以通过 Table.tableName 方法获取。
data = pd.DataFrame({ 'a': [1, 2, 3], 'b': [4, 5, 6], }) t = s.table(data=data) print(t.tableName())
数据结果如下:
TMP_TBL_e03723c9
其中 TMP_TBL_ 开头,表示该句柄为临时表的句柄,会随着 Python 端 Table 对象的析构而析构。
此时释放 DolphinDB 服务端对象有三种方法:
undef
方法s.undef(t.tableName(), "VAR")
将服务端对象置空
s.run(f"{t.tableName()}=NULL")
析构本地变量以取消本地对象对服务端对象的引用
del t
注意: 如果在获取句柄时指定别名,或者获取服务端已存在的非临时表句柄,则 Python 端对象的析构并不会影响服务端数据。
如果一个表对象只是一次性使用,建议不使用上传机制,可以直接通过函数调用来完成,将表对象作为函数的一个参数。函数调用不会缓存数据。函数调用结束后,所有数据都释放,而且只有一次网络传输,降低网络延迟。
表操作
通过 session.table/session.loadTable 方法获得的对象通常为 Table,API 提供一系列方法用于处理 Table,使得能够在 API 端对表进行操作。例如,调用 select、where等方法执行一次查询;调用 update、delete 等方法更新数据……
需要注意的是,调用 update/delete 方法返回的对象并非 Table 类,而是对使用者透明的 TableUpdate/TableDelete 对象,因此,在调用这些方法后,需要执行 execute 方法才能将表数据的更新、修改同步至服务器。详情请参考对应函数的描述以及示例。
本节将用到 example.csv 文件。
获取表属性
提供 rows, cols, colNames 和 schema 方法可获取表属性。
调用 rows/cols 属性可以获取当前表的行数或列数。
>>> s.run("t = table(1..5 as a, 2..6 as b) ") >>> t = s.table(data="t") >>> t.rows 5 >>> t.cols 2
调用 colNames 属性可以获取当前表的列名。
>>> t.colNames ['a', 'b']
调用 schema 属性返回一个 pd.DataFrame,表示当前表的结构(返回结果与服务端函数 schema 的结果中的 colDefs 属性一致)。
>>> t.schema name typeString typeInt extra comment 0 a INT 4 NaN 1 b INT 4 NaN
select
该方法类似 SQL 语句中的 select 子句,用于选取部分列。下例中使用 toDF 方法获取表对应的 pd.DataFrame 对象,详细使用请参考本文 4.6 toDF 方法。
使用一系列的列名作为输入内容。
>>> trade=s.loadText(WORK_DIR+"/example.csv") >>> trade.select(["ticker", "date"]).toDF() ticker date 0 AMZN 1997-05-15 1 AMZN 1997-05-16 2 AMZN 1997-05-19 3 AMZN 1997-05-20 4 AMZN 1997-05-21 ...
使用字符串作为输入内容。
>>> trade.select("ticker, date, bid").toDF() ticker date bid 0 AMZN 1997-05-15 23.50000 1 AMZN 1997-05-16 20.50000 2 AMZN 1997-05-19 20.50000 3 AMZN 1997-05-20 19.62500 4 AMZN 1997-05-21 17.12500 ...
update
使用 update
可以更新表,其中 cols
表示待更新的列,vals
表示待更新的值。需要注意,必须和 execute
一起使用才能将 Python 端的修改同步至服务端。
从下述示例中可以看出,只有执行 execute 后才会将 update 更新的数据同步至服务器。
>>> trade = s.loadText(WORK_DIR+"/example.csv") >>> t1 = trade.update(["VOL"],["999999"]).where("TICKER=`AMZN").where(["date=2015.12.16"]) >>> t2 = trade.where("ticker=`AMZN").where("date=2015.12.16") >>> t2.toDF() TICKER date VOL PRC BID ASK 0 AMZN 2015-12-16 3964470 675.77002 675.76001 675.83002 >>> t1 = trade.update(["VOL"],["999999"]).where("TICKER=`AMZN").where(["date=2015.12.16"]).execute() >>> t2.toDF() TICKER date VOL PRC BID ASK 0 AMZN 2015-12-16 999999 675.77002 675.76001 675.83002
delete
使用 delete
可以删除表中的记录,但必须和 execute
一起使用才能将 Python 端的修改同步至服务端。
>>> trade = s.loadText(WORK_DIR+"/example.csv") >>> trade.rows 13136 >>> t = trade.delete().where('date<2013.01.01') >>> trade.rows 13136 >>> t = trade.delete().where('date<2013.01.01').execute() >>> trade.rows 3024
groupby
groupby
表示根据一定规则进行分组,该方法和 服务端 函数 groupby 一致,调用后,在SQL 中添加 groupby 子句,接收参数为字符串列表或者单个字符串,表示需要分组的列。如果后续追加 having 子句,having子句需包含聚合函数,表示为符合聚合函数条件的每组产生一条记录。
注: groupby
后面需要使用聚合函数,如 count
, sum
, agg
或 agg2
等。
准备数据库的脚本如下:
>>> dbPath = "dfs://valuedb" >>> if s.existsDatabase(dbPath): ... s.dropDatabase(dbPath) >>> s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AMZN","NFLX","NVDA"], dbPath=dbPath) >>> trade = s.loadTextEx(dbPath=dbPath, partitionColumns=["TICKER"], tableName='trade', remoteFilePath=WORK_DIR+"/example.csv")
单独使用 groupby
,分别计算每个股票的 vol 总和与 prc 总和:
>>> trade.select(['sum(vol)','sum(prc)']).groupby(['ticker']).toDF() ticker sum_vol sum_prc 0 AMZN 33706396492 772503.81377 1 NFLX 14928048887 421568.81674 2 NVDA 46879603806 127139.51092
同时使用 groupby
与 having
:
>>> trade.select('count(ask)').groupby(['vol']).having('count(ask)>1').toDF() vol count_ask 0 579392 2 1 3683504 2 2 5732076 2 3 6299736 2 4 6438038 2 5 6946976 2 6 8160197 2 7 8924303 2
contextby
contextby
是 DolphinDB 独有的功能,是对标准 SQL 语句的拓展。使用 context by 子句可以简化对时间序列数据的操作。contextby
与 groupby
相似,区别在于 groupby
为每个组返回一个标量,但是 contextby
为每个组返回一个向量,且向量的长度与该组的行数相同。详细使用方式请参考 context by。
与 having 子句连用时,如果只与聚合函数一起使用,则结果是符合聚合函数条件的分组,每组记录与输入数据中记录数一致;如果与非聚合函数一起使用,结果是符合指定条件的分组。
>>> trade.contextby('ticker').top(3).toDF() TICKER date VOL PRC BID ASK 0 AMZN 1997-05-15 6029815 23.5000 23.5000 23.6250 1 AMZN 1997-05-16 1232226 20.7500 20.5000 21.0000 2 AMZN 1997-05-19 512070 20.5000 20.5000 20.6250 3 NFLX 2002-05-23 7507079 16.7500 16.7500 16.8500 4 NFLX 2002-05-24 797783 16.9400 16.9400 16.9500 5 NFLX 2002-05-28 474866 16.2000 16.2000 16.3700 6 NVDA 1999-01-22 5702636 19.6875 19.6250 19.6875 7 NVDA 1999-01-25 1074571 21.7500 21.7500 21.8750 8 NVDA 1999-01-26 719199 20.0625 20.0625 20.1250 >>> trade.select("TICKER, month(date) as month, cumsum(VOL)").contextby("TICKER,month(date)").toDF() TICKER month cumsum_VOL 0 AMZN 1997-05-01 6029815 1 AMZN 1997-05-01 7262041 2 AMZN 1997-05-01 7774111 3 AMZN 1997-05-01 8230468 4 AMZN 1997-05-01 9807882 ...
contextby
与 having
一起使用:
>>> trade.contextby('ticker').having("sum(VOL)>40000000000").toDF() TICKER date VOL PRC BID ASK 0 NVDA 1999-01-22 5702636 19.6875 19.6250 19.6875 1 NVDA 1999-01-25 1074571 21.7500 21.7500 21.8750 2 NVDA 1999-01-26 719199 20.0625 20.0625 20.1250 3 NVDA 1999-01-27 510637 20.0000 19.8750 20.0000 4 NVDA 1999-01-28 476094 19.9375 19.8750 20.0000 ...
pivotby
pivotby
是 DolphinDB 的独有功能,是对标准 SQL 语句的拓展。它将表中一列或多列的内容按照两个维度重新排列,亦可配合数据转换函数使用。详细使用请参考 pivotby。
pivotby
与 select
子句一起使用时返回一个表。
>>> trade = s.table("dfs://valuedb", "trade") >>> t1 = trade.select("VOL").pivotby("TICKER", "date") >>> t1.toDF() TICKER 1997.05.15 1997.05.16 ... 2016.12.28 2016.12.29 2016.12.30 0 AMZN 6029815.0 1232226.0 ... 3301025 3158299 4139451 1 NFLX NaN NaN ... 4388956 3444729 4455012 2 NVDA NaN NaN ... 57384116 54384676 30323259
pivotby
和 exec
语句一起使用时返回一个 DolphinDB 的矩阵对象。
>>> trade.exec("VOL").pivotby("TICKER", "date").toDF() [array([[ 6029815., 1232226., 512070., ..., 3301025., 3158299., 4139451.], [ nan, nan, nan, ..., 4388956., 3444729., 4455012.], [ nan, nan, nan, ..., 57384116., 54384676., 30323259.]]), array(['AMZN', 'NFLX', 'NVDA'], dtype=object), array(['1997-05-15T00:00:00.000000000', '1997-05-16T00:00:00.000000000', '1997-05-19T00:00:00.000000000', ..., '2016-12-28T00:00:00.000000000', '2016-12-29T00:00:00.000000000', '2016-12-30T00:00:00.000000000'], dtype='datetime64[ns]')]
sort, csort
可使用 csort
关键字排序。
>>> trade = s.loadTable("trade", "dfs://valuedb") >>> trade.contextby('ticker').csort('date desc').toDF() TICKER date VOL PRC BID ASK 0 AMZN 2016-12-30 4139451 749.87000 750.02002 750.40002 1 AMZN 2016-12-29 3158299 765.15002 764.66998 765.15997 2 AMZN 2016-12-28 3301025 772.13000 771.92999 772.15997 3 AMZN 2016-12-27 2638725 771.40002 771.40002 771.76001 4 AMZN 2016-12-23 1981616 760.59003 760.33002 760.59003 ...
除了在排序函数 sort
和 csort
中指定 asc 和 desc 关键字来决定排序顺序外,也可以通过传参的方式实现。
sort(by, ascending=True) csort(by, ascending=True)
参数 ascending 表示是否进行升序排序,默认值为 True。可以通过传入一个 list 来定义多列的不同排序方式。如以下脚本:
>>> trade.select("*").contextby('ticker').csort(["TICKER", "VOL"], True).limit(5).toDF() TICKER date VOL PRC BID ASK 0 AMZN 1997-12-26 40721 54.2500 53.8750 54.625 1 AMZN 1997-08-12 47939 26.3750 26.3750 26.750 2 AMZN 1997-07-21 48325 26.1875 26.1250 26.250 3 AMZN 1997-08-13 49690 26.3750 26.0000 26.625 4 AMZN 1997-06-02 49764 18.1250 18.1250 18.375 5 NFLX 2002-09-05 20725 12.8500 12.8500 12.950 6 NFLX 2002-11-11 26824 8.4100 8.3000 8.400 7 NFLX 2002-09-04 27319 13.0000 12.8200 13.000 8 NFLX 2002-06-10 35421 16.1910 16.1900 16.300 9 NFLX 2002-09-06 54951 12.8000 12.7900 12.800 10 NVDA 1999-05-10 41250 17.5000 17.5000 17.750 11 NVDA 1999-05-07 52310 17.5000 17.3750 17.625 12 NVDA 1999-05-14 59807 18.0000 17.7500 18.000 13 NVDA 1999-04-01 63997 20.5000 20.1875 20.500 14 NVDA 1999-04-19 65940 19.0000 19.0000 19.125 >>> trade.select("*").contextby('ticker').csort(["TICKER", "VOL"], [True, False]).limit(5).toDF() TICKER date VOL PRC BID ASK 0 AMZN 2007-04-25 104463043 56.8100 56.80 56.8100 1 AMZN 1999-09-29 80380734 80.7500 80.75 80.8125 2 AMZN 2006-07-26 76996899 26.2600 26.17 26.1800 3 AMZN 2007-04-26 62451660 62.7810 62.77 62.8300 4 AMZN 2005-02-03 60580703 35.7500 35.74 35.7300 5 NFLX 2015-07-16 63461015 115.8100 115.85 115.8600 6 NFLX 2015-08-24 59952448 96.8800 96.85 96.8800 7 NFLX 2016-04-19 55728765 94.3400 94.30 94.3100 8 NFLX 2016-07-19 55685209 85.8400 85.81 85.8300 9 NFLX 2016-01-20 53009419 107.7400 107.73 107.7800 10 NVDA 2011-01-06 87693472 19.3300 19.33 19.3400 11 NVDA 2011-02-17 87117555 25.6800 25.68 25.7000 12 NVDA 2011-01-12 86197484 23.3525 23.34 23.3600 13 NVDA 2011-08-12 80488616 12.8800 12.86 12.8700 14 NVDA 2003-05-09 77604776 21.3700 21.39 21.3700
top, limit
top 用于取表中的前 n 条记录。详细使用请参考 top。
>>> trade = s.table("dfs://valuedb", "trade") >>> trade.top(5).toDF() TICKER date VOL PRC BID ASK 0 AMZN 1997-05-15 6029815 23.500 23.500 23.625 1 AMZN 1997-05-16 1232226 20.750 20.500 21.000 2 AMZN 1997-05-19 512070 20.500 20.500 20.625 3 AMZN 1997-05-20 456357 19.625 19.625 19.750 4 AMZN 1997-05-21 1577414 17.125 17.125 17.250
limit 子句和 top 子句功能类似。两者的区别在于:
- top 子句中的整型常量不能为负数。在与 context by 子句一同使用时,limit 子句的标量值可以为负整数,返回每个组最后指定数目的记录。其他情况 limit 子句标量值为非负整数。
- 可使用 limit 子句从某行开始选择一定数量的行。
limit 的详细使用请参考 limit。
>>> trade.select("*").contextby('ticker').limit(-2).toDF() TICKER date VOL PRC BID ASK 0 AMZN 2016-12-29 3158299 765.15002 764.66998 765.15997 1 AMZN 2016-12-30 4139451 749.87000 750.02002 750.40002 2 NFLX 2016-12-29 3444729 125.33000 125.31000 125.33000 3 NFLX 2016-12-30 4455012 123.80000 123.80000 123.83000 4 NVDA 2016-12-29 54384676 111.43000 111.26000 111.42000 5 NVDA 2016-12-30 30323259 106.74000 106.73000 106.75000
>>> trade.select("*").limit([2, 5]).toDF() TICKER date VOL PRC BID ASK 0 AMZN 1997-05-19 512070 20.500 20.500 20.625 1 AMZN 1997-05-20 456357 19.625 19.625 19.750 2 AMZN 1997-05-21 1577414 17.125 17.125 17.250 3 AMZN 1997-05-22 983855 16.750 16.625 16.750 4 AMZN 1997-05-23 1330026 18.000 18.000 18.125
merge, merge_asof, merge_window 和 merge_cross
merge 用于内部连接、左连接、左半连接和外部连接,merge_asof 为 asof join,merge_window 为窗口连接。
merge
merge 的参数包括两个索引列和一个字符串参数。如果两个连接列的名称相同,则使用 on 参数指定连接列;如果两个连接列的名称不同,使用 left_on 和 right_on 参数指定连接列。可选参数 how 表示表连接的类型,默认的连接类型为内部连接。
当连接列名称相同时,使用示例如下:
>>> trade = s.table("dfs://valuedb", "trade") >>> t1 = s.table(data={ ... 'TICKER': ['AMZN', 'AMZN', 'AMZN'], ... 'date': np.array(['2015-12-31', '2015-12-30', '2015-12-29'], dtype='datetime64[D]'), ... 'open': [695, 685, 674], ... }) ... >>> t1 = t1.select("TICKER, date(date) as date, open") >>> trade.merge(t1,on=["TICKER","date"]).toDF() TICKER date VOL PRC BID ASK open 0 AMZN 2015-12-29 5734996 693.96997 693.96997 694.20001 674 1 AMZN 2015-12-30 3519303 689.07001 689.07001 689.09998 685 2 AMZN 2015-12-31 3749860 675.89001 675.85999 675.94000 695
当连接列名称不相同时,需要指定 left_on 参数和 right_on 参数:
>>> trade = s.table("dfs://valuedb", "trade") >>> t1 = s.table(data={ ... 'TICKER': ['AMZN', 'AMZN', 'AMZN'], ... 'date': np.array(['2015-12-31', '2015-12-30', '2015-12-29'], dtype='datetime64[D]'), ... 'open': [695, 685, 674], ... }) ... >>> t1 = t1.select("TICKER as TICKER1, date(date) as date1, open") >>> trade.merge(t1, left_on=["TICKER","date"], right_on=["TICKER1", "date1"]).toDF() TICKER date VOL PRC BID ASK open 0 AMZN 2015-12-29 5734996 693.96997 693.96997 694.20001 674 1 AMZN 2015-12-30 3519303 689.07001 689.07001 689.09998 685 2 AMZN 2015-12-31 3749860 675.89001 675.85999 675.94000 695
当要选择左连接时,把 how 参数设置为
left
:>>> trade = s.table("dfs://valuedb", "trade") >>> t1 = s.table(data={ ... 'TICKER': ['AMZN', 'AMZN', 'AMZN'], ... 'date': np.array(['2015-12-31', '2015-12-30', '2015-12-29'], dtype='datetime64[D]'), ... 'open': [695, 685, 674], ... }) ... >>> t1 = t1.select("TICKER, date(date) as date, open") >>> trade.merge(t1,how="left", on=["TICKER","date"]).where('TICKER=`AMZN').where('2015.12.23<=date<=2015.12.31').toDF() TICKER date VOL PRC BID ASK open 0 AMZN 2015-12-23 2722922 663.70001 663.48999 663.71002 NaN 1 AMZN 2015-12-24 1092980 662.78998 662.56000 662.79999 NaN 2 AMZN 2015-12-28 3783555 675.20001 675.00000 675.21002 NaN 3 AMZN 2015-12-29 5734996 693.96997 693.96997 694.20001 674.0 4 AMZN 2015-12-30 3519303 689.07001 689.07001 689.09998 685.0 5 AMZN 2015-12-31 3749860 675.89001 675.85999 675.94000 695.0
当要选择外部连接时,把 how 参数设置为
outer
:>>> t1 = s.table(data={'TICKER': ['AMZN', 'AMZN', 'NFLX'], 'date': ['2015.12.29', '2015.12.30', '2015.12.31'], 'open': [674, 685, 942]}) >>> t2 = s.table(data={'TICKER': ['AMZN', 'NFLX', 'NFLX'], 'date': ['2015.12.29', '2015.12.30', '2015.12.31'], 'close': [690, 936, 951]}) >>> t1.merge(t2, how="outer", on=["TICKER","date"]).toDF() TICKER date open tmp_TICKER tmp_date close 0 AMZN 2015.12.29 674.0 AMZN 2015.12.29 690.0 1 AMZN 2015.12.30 685.0 NaN 2 NFLX 2015.12.31 942.0 NFLX 2015.12.31 951.0 3 NaN NFLX 2015.12.30 936.0
注意: 分区表只能与分区表进行外部连接,内存表只能与内存表进行外部连接。
merge_asof
merge_asof 对应 DolphinDB 中的 asof join (aj)。asof join 与 left join 非常相似,主要有以下区别:
asof join 的最后一个连接列通常是时间类型。对于左表中某行的时间 t,在右表最后一个连接列之外的其它连接列一致的记录中,如果右表没有与 t 对应的时间,asof join 会取右表中 t 之前的最近时间对应的记录;如果有多个相同的时间,会取最后一个时间对应的记录。
如果只有一个连接列,右表必须按照连接列排好序。如果有多个连接列,右表必须在其它连接列决定的每个组内根据最后一个连接列排好序。如果右表不满足这些条件,计算结果将会不符合预期。右表不需要按照其他连接列排序,左表不需要排序。
本节与下节的例子使用了 trades.csv 和 quotes.csv,它们含有 NYSE 网站下载的 AAPL 和 FB 的 2016 年 10 月 24 日的交易与报价数据。
>>> dbPath = "dfs://tickDB" >>> if s.existsDatabase(dbPath): ... s.dropDatabase(dbPath) ... >>> s.database(partitionType=keys.VALUE, partitions=["AAPL","FB"], dbPath=dbPath) >>> trades = s.loadTextEx(dbPath, tableName='trades', partitionColumns=["Symbol"], remoteFilePath=WORK_DIR+"/trades.csv") >>> quotes = s.loadTextEx(dbPath, tableName='quotes', partitionColumns=["Symbol"], remoteFilePath=WORK_DIR+"/quotes.csv") >>> trades.top(5).toDF() Time Exchange Symbol Trade_Volume Trade_Price 0 1970-01-01 08:00:00.022239 75 AAPL 300 27.00 1 1970-01-01 08:00:00.022287 75 AAPL 500 27.25 2 1970-01-01 08:00:00.022317 75 AAPL 335 27.26 3 1970-01-01 08:00:00.022341 75 AAPL 100 27.27 4 1970-01-01 08:00:00.022368 75 AAPL 31 27.40 >>> quotes.where("second(Time)>=09:29:59").top(5).toDF() Time Exchange Symbol Bid_Price Bid_Size Offer_Price Offer_Size 0 1970-01-01 09:30:00.005868 90 AAPL 26.89 1 27.10 6 1 1970-01-01 09:30:00.011058 90 AAPL 26.89 11 27.10 6 2 1970-01-01 09:30:00.031523 90 AAPL 26.89 13 27.10 6 3 1970-01-01 09:30:00.284623 80 AAPL 26.89 8 26.98 8 4 1970-01-01 09:30:00.454066 80 AAPL 26.89 8 26.98 1 >>> trades.merge_asof(quotes,on=["Symbol","Time"]).select(["Symbol","Time","Trade_Volume","Trade_Price","Bid_Price", "Bid_Size","Offer_Price", "Offer_Size"]).top(5).toDF() Symbol Time Trade_Volume Trade_Price Bid_Price Bid_Size \ 0 AAPL 1970-01-01 08:00:00.022239 300 27.00 26.9 1 1 AAPL 1970-01-01 08:00:00.022287 500 27.25 26.9 1 2 AAPL 1970-01-01 08:00:00.022317 335 27.26 26.9 1 3 AAPL 1970-01-01 08:00:00.022341 100 27.27 26.9 1 4 AAPL 1970-01-01 08:00:00.022368 31 27.40 26.9 1 Offer_Price Offer_Size 0 27.49 10 1 27.49 10 2 27.49 10 3 27.49 10 4 27.49 10
merge_window
merge_window
对应 DolphinDB 中的 window join(wj),它是 asof join 的扩展。leftBound 参数和 rightBound 参数用于指定窗口的边界 w1 和 w2,对左表中最后一个连接列对应的时间为 t 的记录,在右表中选择 (t+w1) 到 (t+w2) 的时间并且其他连接列匹配的记录,然后对这些记录使用指定的聚合函数。
window join 和 prevailing window join 的唯一区别是,如果右表中没有与窗口左边界时间(即 t+w1)匹配的值,prevailing window join 会选择右表中 (t+w1) 之前的最近时间的记录作为 t+w1 时的记录。如果要使用 prevailing window join,需将 prevailing 参数设置为 True。
>>> trades.merge_window(quotes, -5000000000, 0, aggFunctions=["avg(Bid_Price)","avg(Offer_Price)"], on=["Symbol","Time"]).where("Time>=07:59:59").top(10).toDF() Time Exchange Symbol Trade_Volume Trade_Price avg_Bid_Price avg_Offer_Price 0 1970-01-01 08:00:00.022239 75 AAPL 300 27.00 26.90 27.49 1 1970-01-01 08:00:00.022287 75 AAPL 500 27.25 26.90 27.49 2 1970-01-01 08:00:00.022317 75 AAPL 335 27.26 26.90 27.49 3 1970-01-01 08:00:00.022341 75 AAPL 100 27.27 26.90 27.49 4 1970-01-01 08:00:00.022368 75 AAPL 31 27.40 26.90 27.49 5 1970-01-01 08:00:02.668076 68 AAPL 2434 27.42 26.75 27.36 6 1970-01-01 08:02:20.116025 68 AAPL 66 27.00 NaN NaN 7 1970-01-01 08:06:31.149930 75 AAPL 100 27.25 NaN NaN 8 1970-01-01 08:06:32.826399 75 AAPL 100 27.25 NaN NaN 9 1970-01-01 08:06:33.168833 75 AAPL 74 27.25 NaN NaN [10 rows x 6 columns]
merge_cross
merge_cross
对应 DolphinDB 中的 cross join。交叉连接函数返回两张表的笛卡尔积的结果集。如果左表有 n 行,右表有 m 行,那么笛卡尔积结果集包含 n*m 行。
>>> s.run(""" ... t1 = table(2010 2011 2012 as year); ... t2 = table(`IBM`C`AAPL as Ticker); ... """) ... >>> t1 = s.table(data="t1") >>> t2 = s.table(data="t2") >>> t1.merge_cross(t2).toDF() year Ticker 0 2010 IBM 1 2010 C 2 2010 AAPL 3 2011 IBM 4 2011 C 5 2011 AAPL 6 2012 IBM 7 2012 C 8 2012 AAPL
rename
调用该函数可以给表重新设置名称。
>>> t.tableName()
t
>>> t.rename("xx")
>>> t.tableName()
xx
注意: 如果给临时表重新设置名称,会导致临时表无法被及时析构,导致内存泄漏。
drop
调用该函数可以删除表的指定元素。
>>> trade = s.loadText(WORK_DIR+"/example.csv") >>> trade.colNames ['TICKER', 'date', 'VOL', 'PRC', 'BID', 'ASK'] >>> t1 = trade.drop(['ask', 'bid']) >>> t1.colNames ['TICKER', 'date', 'VOL', 'PRC']
exec
由于使用 select 子句总是生成一张表,即使只选择一列数据亦是如此。若需要生成一个标量或者一个向量,建议使用 exec 子句。
exec
只选择一列时将生成一个 DolphinDB 的向量。在 Python 中使用 toDF()
加载该对象,可以打印出一个 np.ndarray 对象:
>>> trade = s.loadText(WORK_DIR+"/example.csv")
>>> trade.exec("ticker").toDF()
['AMZN' 'AMZN' 'AMZN' ... 'NFLX' 'NFLX' 'NFLX']
如果 exec
语句选择了多列,则结果和 select
语句一致,将生成一个 DolphinDB 的 table 类型。在 Python 中使用 toDF()
加载该对象,可以打印出一个 pd.DataFrame 对象:
>>> trade.exec(["ticker", "date", "bid"]).toDF()
ticker date bid
0 AMZN 1997-05-15 23.50000
1 AMZN 1997-05-16 20.50000
2 AMZN 1997-05-19 20.50000
3 AMZN 1997-05-20 19.62500
4 AMZN 1997-05-21 17.12500
...
where
where
子句用于过滤数据。
多个过滤条件
where 子句在使用时,如果指定了多个过滤条件,则各个条件之间默认使用 and 连接;如果需要以 or 来连接过滤条件,则需要将 or 连接的条件作为一个条件传入 where 方法。
注: 1.30.21.2 及之前版本的 API,在添加多个 where 子句时,不会使用括号保证运算优先级,因此,使用时需在筛选条件外添加括号,以保证运算正确性。
>>> trade = s.loadText(WORK_DIR+"/example.csv") >>> t1 = trade.select(['date','bid','ask','prc','vol']).where('TICKER=`AMZN').where('bid!=NULL').where('ask!=NULL').where('vol>10000000') >>> t1.toDF() date bid ask prc vol 0 1998-09-01 79.93750 80.25000 79.95313 11321844 1 1998-11-17 148.68750 149.00000 148.50000 10279448 2 1998-11-20 179.62500 179.75000 180.62500 11314228 3 1998-11-23 217.75000 217.81250 218.00000 11559042 4 1998-11-24 214.25000 214.62500 214.50000 13820992 ... >>> t1.rows 765 >>> t1.showSQL() select date,bid,ask,prc,vol from TMP_TBL_2744917d where (TICKER=`AMZN) and (bid!=NULL) and (ask!=NULL) and (vol>10000000)
输入内容为字符串
where
的输入内容可以是包含多个条件的字符串。>>> t1 = trade.select("ticker, date, vol").where("bid!=NULL, ask!=NULL, vol>50000000") >>> t1.toDF() ticker date vol 0 AMZN 1999-09-29 80380734 1 AMZN 2000-06-23 52221978 2 AMZN 2001-11-26 51543686 3 AMZN 2002-01-22 57235489 4 AMZN 2005-02-03 60580703 ... >>> t1.rows 41
execute
Table 类中,该方法表示以 exec 方式选取传入列的数据,传入参数
expr
为字符串或字符串列表,表示选取的列。>>> trade = s.loadText(WORK_DIR+"/example.csv") >>> trade.execute("TICKER") ['AMZN' 'AMZN' 'AMZN' ... 'NFLX' 'NFLX' 'NFLX'] >>> trade.execute(["VOL", "PRC"]) VOL PRC 0 6029815 23.50000 1 1232226 20.75000 2 512070 20.50000 3 456357 19.62500 4 1577414 17.12500 ... ... ... 13131 2010120 125.59000 13132 5287520 128.35001 13133 4388956 125.89000 13134 3444729 125.33000 13135 4455012 123.80000 [13136 rows x 2 columns]
如果对 Table 类对象执行 update/delete 操作后,必须执行 execute 才能确保在服务端正确执行,此时无需传入参数,也不会以 exec 的形式执行,仅将修改语句提交至服务端执行,并返回一个 Table 对象作为结果。
executeAs
executeAs
可以把结果保存为 server 端的表对象,表名由参数 newTableName 指定,执行后将返回一个 Table 对象管理新创建的表。
注意: 通过执行该方法创建的表,其生存周期不受 Python 端控制,而是与 session 的生存周期保持一致。
>>> trade = s.loadText(WORK_DIR+"/example.csv") >>> t1 = trade.select(['date','bid','ask','prc','vol']).where('TICKER=`AMZN').where('bid!=NULL').where('ask!=NULL').where('vol>10000000').executeAs("AMZN") >>> t1.tableName() AMZN
showSQL
可以使用 showSQL
展示 SQL 语句。
>>> trade=s.loadText(WORK_DIR+"/example.csv") >>> trade.select(["ticker", "date"]).showSQL() select ticker,date from TMP_TBL_fb11c541
toDF, toList
toDF 和 toList 功能类似,都是执行缓存在 Python 端的 SQL 语句(可通过 showSQL 方法获取),并返回执行结果。两者的区别在于,toDF 与 session.run(sql) 一致,toList 则与 session.run(sql, pickleTableToList=True) 行为一致。session 在构造时,如果指定 protocol=PROTOCOL_PICKLE/PROTOCOL_DDB
,则 toDF 将返回一个 pd.DataFrame,toList 返回一个由 np.ndarray 构成的 list,每个 np.ndarray 表示表中的一列。详细区别请参考章节 PROTOCOL_DDB 和 PROTOCOL_PICKLE。