loadTextEx
语法
loadTextEx(dbHandle, tableName, partitionColumns,
filename, [delimiter], [schema], [skipRows=0], [transform], [sortColumns], [atomic=false], [arrayDelimiter],
[containHeader], [arrayMarker])
参数
dbHandle 是数据库的句柄,可以是内存数据库或分布式数据库。
tableName 是一个字符串,表示表的名称。
partitionColumns 是字符串标量或向量,表示分区列。对于顺序分区类型的数据库,partitionColumns 为空字符串""。对于组合分区类型的数据库,partitionColumns 是字符串向量。
filename 是字符串,表示数据文件的路径。仅支持 CSV 格式的文件。若传入其他格式文件,则无法保证数据准确性。
delimiter 字符串标量,表示数据文件中各列的分隔符。分隔符可以是一个或多个字符,默认是逗号(",")。
schema 是一个表,用于指定各列的数据类型。具体请参考 loadText 的 schema 参数。
skipRows 是0到1024之间的整数,表示从文件头开始忽略的行数。它是一个可选参数。默认值为0。
transform 是一元函数,并且该函数接受的参数必须是一个表。
sortColumns 字符串标量或向量,用于指定每一分区内的排序列,每次写入磁盘的数据在每一分区内将按 sortColumns 进行排序。系统默认 sortColumns (指定多列时) 最后一列为时间列,其余列字段作为排序的索引列,称作 sort key。每一分区内,同一个 sort key 组合值对应的数据将按时间列顺序连续存放在一起。查询时,若查询条件包含索引列,可以快速定位数据所在的数据块位置,提高查询性能。
- 仅当 dbHandle 指示的数据库采用 "TSDB" 引擎(engine="TSDB")时,本参数才生效。
- sortColumns 只能是 INTEGER, TEMPORAL, LITERAL 类别(除 BLOB) 或 DECIMAL 类型。
- 若 sortColumns 指定为多列,则 sortColumns 的最后一列必须为时间列,其余列为索引列,且索引列不能为为 TIME, TIMESTAMP, NANOTIME, NANOTIMESTAMP 类型。
- 若 sortColumns 仅指定一列,则该列作为 sort key,其类型不能为TIME, TIMESTAMP, NANOTIME, NANOTIMESTAMP。若 sortColumns 指定为一列时间列 (非分区列),且同时指定了 sortKeyMappingFunction,则查询的过滤条件中 sortColumns 只能与相同时间类型的值进行比较。
- 频繁查询的字段适合设置为 sortColumns(建议不超过 4 列),且建议优先把查询频率高的字段作为 sortColumns 中位置靠前的列。
- 为保证性能最优,建议每个分区内索引列的组合数(sort key)不超过 2000 个。
- sortColumns 是每个分区内部 level file 内数据的排序依据,与其是否为分区字段无关。
atomic 是一个布尔值,表示开启 Cache Engine 的情况下,是否保证文件加载的原子性。设置为 true,一个文件的加载过程视为一个完整的事务;设置为 false,加载一个文件的过程分为多个事务进行。
arrayDelimiter 是数据文件中数组向量列的分隔符。默认是逗号。由于不支持自动识别数组向量,必须同步修改 schema 的 type 列为数组向量类型。
containHeader 布尔值,表示数据文件是否包含标题行,默认为空。具体请参考 loadText 的 containHeader 参数。
arrayMarker 包含两个字符的字符串或或 CHAR 类型数据对,两个字符分别表示数组向量左右边界的标识符。默认标识符为双引号(")。
-
不能包含空格、Tab(
\t
) 和换行符(\t
和\n
)。 -
不能包含数字或字母。
-
如果其中一个为双引号("),另一个也必须为双引号。
-
如果标识符为
'
,"
或\
,需视情况添加转义符。例如arrayMarker="\"\""
。 -
如果 delimiter 是单个字符,则 arrayMarker 不能包含与其相同的字符。
-
如果 delimiter 是多个字符,则 arrayMarker 左边界不能与 delimiter 的首个字符相同。
详请
例子
例1. 使用以下脚本生成模拟的数据文件:
n=10000
ID=rand(100, n)
dates=2017.08.07..2017.08.11
date=rand(dates, n)
vol=rand(1..10 join int(), n)
t=table(ID, date, vol)
saveText(t, "/home/DolphinDB/Data/t.txt");
例 2. 把 t.txt 文件加载到范围分区类型的分布式数据库中,分区列为 ID。
db = database(directory="dfs://rangedb", partitionType=RANGE, partitionScheme=0 51 101)
pt=loadTextEx(dbHandle=db,tableName=`pt, partitionColumns=`ID, filename="/home/DolphinDB/Data/t.txt");
例 3. 将文本文件加载到使用 TSDB 存储引擎创建的分布式数据库中。
db = database(directory="dfs://rangedb", partitionType=RANGE, partitionScheme=0 51 101, engine='TSDB')
pt=loadTextEx(dbHandle=db, tableName=`pt, partitionColumns=`ID, filename="/home/DolphinDB/Data/t.txt", sortColumns=`ID`date);
例 4. 把 t.txt 文件加载到组合分区类型的分布式数据库中,分区列为 date 和 ID。
dbDate = database(directory="", partitionType=VALUE, partitionScheme=2017.08.07..2017.08.11)
dbID=database(directory="", partitionType=RANGE, partitionScheme=0 51 101)
db = database(directory="dfs://compoDB", partitionType=COMPO, partitionScheme=[dbDate, dbID])
pt = loadTextEx(dbHandle=db,tableName=`pt, partitionColumns=`date`ID, filename="/home/DolphinDB/Data/t.txt");
例 5. 把 t.txt 加载到值分区类型分内存数据库中,分区列为 date。
db = database(directory="", partitionType=VALUE, partitionScheme=2017.08.07..2017.08.11)
pt = db.loadTextEx(tableName="", partitionColumns=`date, filename="/home/DolphinDB/Data/t.txt");
pt.sortBy!(`ID`x);
例 6. 加载包含数组向量列的数据
导入包含数组向量的数据文件时,由于系统无法自动识别数组向量,因此需要通过 schema 参数修改数组向量列的类型,并使用 arrayDelimiter 参数指定数据文件中数组向量列的分隔符。
首先,使用以下脚本模拟生成一个 csv 文本文件:
bid = array(DOUBLE[], 0, 20).append!([1.4799 1.479 1.4787, 1.4796 1.479 1.4784, 1.4791 1.479 1.4784])
ask = array(DOUBLE[], 0, 20).append!([1.4821 1.4825 1.4828, 1.4818 1.482 1.4821, 1.4814 1.4818 1.482])
TradeDate = 2022.01.01 + 1..3
SecurityID = rand(`APPL`AMZN`IBM, 3)
t = table(SecurityID as `sid, TradeDate as `date, bid as `bid, ask as `ask)
t;
saveText(t,filename="/home/DolphinDB/Data/t.csv",delimiter=',',append=true)
然后,调用 loadTextEx 导出该文件,本例通过 schema 参数指定数组向量列 bid 和 ask 的数据类型为 DOUBLE[]。
db = database(directory="dfs://testDB", partitionType=VALUE, partitionScheme=`APPL`AMZN`IBM)
path = "/home/DolphinDB/Data/t.csv"
schema = extractTextSchema(path);
update schema set type = "DOUBLE[]" where name="bid" or name ="ask"
t = loadTextEx(dbHandle=db, tableName=`t, partitionColumns=`sid, filename=path, schema=schema, arrayDelimiter=",");
select * from t;
sid | date | bid | ask |
---|---|---|---|
APPL | 2022.01.02 | [1.4799,1.479,1.4787] | [1.4821,1.4825,1.4828] |
IBM | 2022.01.03 | [1.4796,1.479,1.4784] | [1.4818,1.482,1.4821] |
IBM | 2022.01.04 | [1.4791,1.479,1.4784] | [1.4814,1.4818,1.482] |
例 7. 使用 transform 参数处理导入的数据,再将其写入数据库。
示例文件(由例 1 生成)的 vol 列包含空值。通过transform 将空值填充为0后再加载到组合分区类型的分布式数据库中。
dbDate = database(directory="", partitionType=VALUE, partitionScheme=2017.08.07..2017.08.11)
dbID=database(directory="", partitionType=RANGE, partitionScheme=0 51 101)
db = database(directory="dfs://compoDB", partitionType=COMPO, partitionScheme=[dbDate, dbID]);
pt=db.createPartitionedTable(table=t, tableName=`pt, partitionColumns=`date`ID)
pt=loadTextEx(dbHandle=db, tableName=`pt, partitionColumns=`date`ID, filename="/home/DolphinDB/Data/t.txt", transform=nullFill!{,0});
例 8. 通过 schema 修改待导入字段的数据类型,同时通过 transform 参数,添加新的字段且调整列的顺序。
在实际场景中,通常是一个股票的数据保存到一个文件,该文件以标的名称命名,且不包含标的名称列。导入需求如下:
-
在导入数据时,需要将在数据表中添加一列以存储标的名称。
-
调整数据文件中的字段顺序和待导入表中字段顺序一致。
本例以导入一个标的的文件为例进行说明。
-
sz000001.csv 文件中字段顺序是 tradetime,open,close,high,low,vol。
-
待导入表的字段顺序是 symbol(需要添加的列),datetime,vol,open,close,high,low。
dir = "/home/data/sz000001.csv"
sym = dir.split('/').tail(1).split('.')[0]
// 调整 tradetime 列的类型为 DATETIME
schema = extractTextSchema(dir)
update schema set type="DATETIME" where name="tradetime"
if(existsDatabase("dfs://stock_data")) {
dropDatabase("dfs://stock_data")
}
db=database(directory="dfs://stock_data", partitionType=VALUE, partitionScheme=2000.01M..2019.12M)
colNames=`sym`tradetime`vol`open`close`high`low
colTypes=[SYMBOL, DATETIME, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
t = table(1:0, colNames, colTypes)
pt = db.createPartitionedTable(t, `pt, `tradetime);
def mytrans(mutable t, sym, colNames){
t.replaceColumn!(`tradetime, datetime(t.tradetime))
//在中添加第一列 sym
t1 = select sym, * from t
//通过 reorderColumns! 调整表中各列的顺序
t1.reorderColumns!(colNames)
return t1
}
// 指定 schema 和 transform 参数,对数据进行处理
loadTextEx(db, `pt, `tradetime, dir, schema=schema, transform=mytrans{,sym, colNames})
select top 10 * from loadTable("dfs://stock_data", `pt)
sym | tradetime | vol | open | close | high | low |
---|---|---|---|---|---|---|
sz000001 | 2010.01.01T00:00:00 | 10,732 | 35.9484 | 35.4385 | 36.3261 | 35.9569 |
sz000001 | 2010.01.04T00:00:00 | 97,555 | 16.4653 | 13.6348 | 16.7255 | 14.9401 |
sz000001 | 2010.01.05T00:00:00 | 43,992 | 51.3616 | 53.1199 | 52.2155 | 50.4782 |
sz000001 | 2010.01.06T00:00:00 | 85,283 | 76.3469 | 80.0076 | 76.7017 | 74.9479 |
sz000001 | 2010.01.07T00:00:00 | 78,837 | 38.9411 | 35.8283 | 39.5269 | 38.2178 |
sz000001 | 2010.01.08T00:00:00 | 13,317 | 70.8803 | 67.9027 | 71.8693 | 70.8072 |
sz000001 | 2010.01.11T00:00:00 | 22,958 | 96.3163 | 97.4031 | 96.5605 | 96.364 |
sz000001 | 2010.01.12T00:00:00 | 45,621 | 17.2699 | 18.5016 | 17.7689 | 16.5028 |
sz000001 | 2010.01.13T00:00:00 | 54,886 | 75.7999 | 77.0245 | 76.4588 | 75.7482 |
sz000001 | 2010.01.14T00:00:00 | 3,132 | 49.8656 | 49.3416 | 50.7761 | 49.7972 |
例 9. 批量导入小文件
在现实场景中,数据供应商会将一只股票数据保存到一个文件中,这种场景的特点是文件数量比较多,但单个文件比较小。如果将文件一个个导入,则效率会比较低。为提高导入效率,可以考虑将多个小文件批量合并后再导入。
数据文件:smallDataset.zip
//1. 建库建表
database(directory = 'dfs://k_day_level', partitionType = RANGE, partitionScheme =[2000.01M,2001.01M,2002.01M,2003.01M,2004.01M,2005.01M,2006.01M,2007.01M,2008.01M,2009.01M,2010.01M,2011.01M,2012.01M,2013.01M,2014.01M,2015.01M,2016.01M,2017.01M,2018.01M,2019.01M,2020.01M,2021.01M,2022.01M,2023.01M,2024.01M]$7, engine= `OLAP, atomic = `TRANS)
db = database("dfs://k_day_level")
colName = `securityid`tradetime`open`close`high`low`vol`val`vwap
colType = [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE]
tbSchema = table(1:0, colName, colType)
if(existsDatabase("dfs://k_day_level")){
dropDatabase("dfs://k_day_level")
}
db.createPartitionedTable(table=tbSchema,tableName=`k_day,partitionColumns=`tradetime)
//2. 导入数据文件
batchNum=1000
dir = "/home/data/smallDataset/"
allFiles = files(dir).filename
i = 0
s = allFiles.size()
do{
files =allFiles[i:min(i+batchNum, s)]
data = each(loadText, dir + files).unionAll(false)
loadTable("dfs://k_day_level","k_day").append!(data)
i = i + batchNum
}while(i < s)
select top 10* from loadTable("dfs://k_day_level","k_day")
securityid | tradetime | open | close | high | low | vol | val | vwap |
---|---|---|---|---|---|---|---|---|
sz000001 | 2010.01.01T00:00:00.000 | 35.9484 | 35.4385 | 36.3261 | 35.9569 | 10,732 | 380,325.7701 | 35.4385 |
sz000002 | 2010.01.01T00:00:00.000 | 5.4374 | 7.2864 | 5.4665 | 3.8142 | 45,577 | 332,092.5668 | 7.2864 |
sz000003 | 2010.01.01T00:00:00.000 | 8.4177 | 10.5074 | 8.7004 | 7.577 | 96,865 | 1,017,794.5994 | 10.5074 |
sz000004 | 2010.01.01T00:00:00.000 | 78.7732 | 76.4809 | 79.1491 | 78.4188 | 67,520 | 5,163,993.091 | 76.4809 |
sz000005 | 2010.01.01T00:00:00.000 | 23.5196 | 25.8487 | 23.9986 | 23.9379 | 1,156 | 29,881.1144 | 25.8487 |
sz000006 | 2010.01.01T00:00:00.000 | 75.3909 | 73.9721 | 75.5661 | 74.8373 | 70,337 | 5,202,974.0605 | 73.9721 |
sz000007 | 2010.01.01T00:00:00.000 | 21.9932 | 22.7951 | 22.5291 | 21.218 | 76,912 | 1,753,219.9525 | 22.7951 |
sz000008 | 2010.01.01T00:00:00.000 | 21.0682 | 21.5055 | 21.6914 | 20.733 | 9,320 | 200,431.077 | 21.5055 |
sz000009 | 2010.01.01T00:00:00.000 | 77.2482 | 75.6649 | 78.2001 | 76.9927 | 13,538 | 1,024,351.3968 | 75.6649 |
sz000010 | 2010.01.01T00:00:00.000 | 54.2793 | 55.6466 | 54.3281 | 53.6054 | 31,924 | 1,776,461.4488 | 55.6466 |