parquet
Apache Parquet is a columnar storage format for efficient data storage and retrieval. DolphinDB Parquet plugin supports importing and exporting Parquet files to and from DolphinDB.
Installation (with installPlugin
)
Required server version: DolphinDB 2.00.10 or higher.
Supported OS: Linux, Linux JIT.
Installation Steps:
(1) Use listRemotePlugins to check plugin information in the plugin repository.
Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.
login("admin", "123456")
listRemotePlugins(, "http://plugins.dolphindb.com/plugins/")
(2) Invoke installPlugin for plugin installation
installPlugin("parquet")
(3) Use loadPlugin to load the plugin before using the plugin methods.
loadPlugin("parquet")
Method References
extractParquetSchema
Syntax
extractParquetSchema(fileName)
Details
The method returns the schema of the Parquet file, with two columns: column names and data types.
Parameters
- fileName: a STRING scalar indicating the Parquet file name.
Examples
parquet::extractParquetSchema("userdata1.parquet")
loadParquet
Syntax
loadParquet(fileName,[schema],[column],[rowGroupStart],[rowGroupNum])
Details
The method imports a Parquet file to a DolphinDB in-memory table. Regarding data type conversion, refer to "Data Type Mappings".
Parameters
- fileName: A STRING scalar indicating the Parquet file name.
- schema (optional): A table with the column names and their data types. Specify the parameter to modify the data types of the columns generated by the system.
- column (optional): A vector of integers indicating the column index to be imported. All columns will be read if it is not specified.
- rowGroupStart (optional): A non-negative integer indicating the index of the row group from which the data import starts. The file will be read from the beginning if it is not specified.
- rowGroupNum (optional): An integer indicating the number of row groups to be read. The file will be read from the beginning if it is not specified.
Examples
parquet::loadParquet("userdata1.parquet")
loadParquetEx
Syntax
loadParquetEx(dbHandle,tableName,partitionColumns,fileName,[schema],[column],[rowGroupStart],[rowGroupNum],[tranform])
Details
The method loads a Parquet file to a DolphinDB partitioned table and returns a table object with metadata of the table.
- If dbHandle is specified and is not an empty string "": load the file to a DFS database.
- If dbHandle is an empty string "" or unspecified: load the file to a partitioned in-memory table.
Regarding data type conversion, refer to "Data Type Mappings".
Parameters
- dbHandle: A database handle.
- tableName: A string indicating the table name.
- partitionColumns: A STRING scalar or vector indicating the partitioning column(s). For a composite partition, it is a vector.
- fileName: a STRING scalar indicating the Parquet file name.
- schema (optional): A table with the column names and their data types. Specify the parameter to modify the data types of the columns generated by the system.
- column (optional): A vector of integers indicating the column index to be imported. All columns will be read if it is not specified.
- rowGroupStart (optional): A non-negative integer indicating the index of the row group from which the data import starts. The file will be read from the beginning if it is not specified.
- rowGroupNum (optional): An integer indicating the number of row groups to be read. The file will be read from the beginning if it is not specified.
- tranform (optional): A unary function which takes a table as input. If it is specified, a partitioned table must be created before loading the file. The method will first apply the specified function to the data, and then save the result to the partitioned table.
Examples
- Import to a partitioned DFS table
db = database("dfs://rangedb", RANGE, 0 500 1000)
parquet::loadParquetEx(db,`tb,`id,"userdata1.parquet")
- Import to a partitioned in-memory table
db = database("", RANGE, 0 500 1000)
parquet::loadParquetEx(db,`tb,`id,"userdata1.parquet")
- Specify the parameter transform to transform the default data type (e.g. 20200101) to a specific type (e.g. DATE)
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2020.01.01..2020.01.30)
dataFilePath="level.parquet"
schemaTB=parquet::extractParquetSchema(dataFilePath)
update schemaTB set type="DATE" where name="date"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb1=db.createPartitionedTable(tb,`tb1,`date);
def i2d(mutable t){
return t.replaceColumn!(`date,datetimeParse(t.date),"yyyy.MM.dd"))
}
t = parquet::loadParquetEx(db,`tb1,`date,dataFilePath,datasetName,,,,i2d)
parquetDS
Syntax
parquetDS(fileName,[schema])
Details
The method creates data sources based on the input file name. The number of tables is the same as the number of row groups.
Parameters
- fileName: A STRING scalar indicating the Parquet file name.
- schema (optional): A table with the column names and their data types. Specify the parameter to modify the data types of the columns generated by the system.
Examples
ds = parquet::parquetDS("userdata1.parquet")
size ds;
// Output: 1
ds[0];
// Output: DataSource< loadParquet("userdata1.parquet",,,0,1) >
saveParquet
Syntax
saveParquet(table, fileName, [compression])
Details
The method exports a DolphinDB table to a Parquet file.
Parameters
- table: The table to be exported.
- fileName: A STRING scalar indicating the Parquet file name to save the table
- compression (optional): A STRING scalar indicating the compression mode. It supports “snappy”, “gzip”, “zstd” and the default is no compression.
Examples
parquet::saveParquet(tb, "userdata1.parquet")
setReadThreadNum
Syntax
setReadThreadNum(num)
Details
The method enables the concurrency level for reading Parquet files and the maximum number of threads to use.
Note: The actual concurrency is influenced by worker due to the plugin's internal use of the ploop
function for column-wise parallel reading.
Parameters
- num: Maximum number of read threads.
- 1 (default): no additional threads are created, and the Parquet file is read in the current thread.
- Greater than 1: The task of reading the Parquet file is divided into num parts, allowing up to num threads.
- 0: Each column read will be treated as a separate task.
Examples
parquet::setReadThreadNum(0)
getReadThreadNum
Syntax
getReadThreadNum()
Details
The method retrieves the maximum number of read threads for the Parquet plugin.
Examples
parquet::getReadThreadNum()
Data Type Mappings
Import
When a Parquet file is imported to DolphinDB, the data types are converted based on the LogicalType as annotated in the file. If the LogicalType or ConvertedType is not defined, the conversion will be performed based on the physical types.
Logical Type in Parquet | TimeUnit in Parquet | Type in DolphinDB |
---|---|---|
INT(bit_width=8,is_signed=true) | \ | CHAR |
INT(bit_width=8,is_signed=false or bit_width=16,is_signed=true) | \ | SHORT |
INT(bit_width=16,is_signed=false or bit_width=32,is_signed=true) | \ | INT |
INT(bit_width=32,is_signed=false or bit_width=64,is_signed=true) | \ | LONG |
INT(bit_width=64,is_signed=false) | \ | LONG |
ENUM | \ | SYMBOL |
DECIMAL | \ | DOUBLE |
DATE | \ | DATE |
TIME | MILLIS\MICROS\NANOS | TIME\NANOTIME\NANOTIME |
TIMESTAMP | MILLIS\MICROS\NANOS | TIMESTAMP\NANOTIMESTAMP\NANOTIMESTAMP |
INTEGER | \ | INT\LONG |
STRING | \ | STRING |
JSON | \ | not support |
BSON | \ | not support |
UUID | \ | not support |
MAP | \ | not support |
LIST | \ | not support |
NIL | \ | not support |
Converted Type in Parquet | Type in DolphinDB |
---|---|
INT_8 | CHAR |
UINT_8\INT_16 | SHORT |
UINT_16\INT_32 | INT |
TIMESTAMP_MICROS | NANOTIMESTAMP |
TIMESTAMP_MILLIS | TIMESTAMP |
DECIMAL | DOUBLE |
UINT_32\INT_64\UINT_64 | LONG |
TIME_MICROS | NANOTIME |
TIME_MILLIS | TIME |
DATE | DATE |
ENUM | SYMBOL |
UTF8 | STRING |
MAP | not support |
LIST | not support |
JSON | not support |
BSON | not support |
MAP_KEY_VALUE | not support |
Physical Type in Parquet | Type in DolphinDB |
---|---|
BOOLEAN | BOOL |
INT32 | INT |
INT64 | LONG |
INT96 | NANOTIMESTAMP |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BYTE_ARRAY | STRING |
FIXED_LEN_BYTE_ARRAY | STRING |
Note:
- Conversion of the Parquet repeated fields is not supported.
- DECIMAL can be used to convert data of the following physical types: INT32, INT64 and FIXED_LEN_BYTE_ARRAY.
- DolphinDB does not support unsigned data types. Therefore, in case of UINT_64 overflow when loading a Parquet file, the data will be converted to NULL values in DolphinDB.
Export
When exporting data from DolphinDB to a Parquet file, the system will convert the data types to Parquet types based on the given table schema.
Type in DolphinDB | Physical Type in Parquet | Logical Type in Parquet |
---|---|---|
BOOL | BOOLEAN | \ |
CHAR | FIXED_LEN_BYTE_ARRAY | \ |
SHORT | INT32 | INT(16) |
INT | INT32 | INT(32) |
LONG | INT64 | INT(64) |
DATE | INT32 | DATE |
MONTH | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
MINUTE | INT32 | TIME_MILLIS |
SECOND | INT32 | TIME_MILLIS |
DATETIME | INT64 | TIMESTAMP_MILLIS |
TIMESTAMP | INT64 | TIMESTAMP_MILLIS |
NANOTIME | INT64 | TIME_NANOS |
NANOTIMESTAMP | INT64 | TIMESTAMP_NANOS |
FLOAT | FLOAT | \ |
DOUBLE | DOUBLE | \ |
STRING | BYTE_ARRAY | STRING |
SYMBOL | BYTE_ARRAY | STRING |