Importing Parquet Files
Apache Parquet is a columnar storage format for efficient data storage and retrieval. DolphinDB Parquet plugin supports importing and exporting Parquet files to and from DolphinDB.This page explains how to use the plugin to import Parquet datasets into DolphinDB.
Installing Parquet Plugin
Invoke installPlugin for plugin installation:
installPlugin("parquet")
Use loadPlugin to load the plugin before using the plugin methods.
loadPlugin("parquet")
Parquet Data Import Methods
The DolphinDB parquet plugin provides the following methods to facilitate Parquet file import:
loadParquet
: Imports Parquet files as in-memory tables.loadParquetEx
: Imports Parquet files directly into either a distributed database or in-memory database.parquetDS
: Divides a Parquet file into multiple smaller data sources. It can be used with functionmr
to load data.
The following table outlines common parameters of these methods.
Parameter | Description |
---|---|
filename | A string indicating the directory of input Parquet file. |
schema |
A table with the following columns:
|
column | A vector of integers indicating the column index to be imported. All columns will be read if it is not specified. |
rowGroupStart | A non-negative integer indicating the index of the row group from which the data import starts. The file will be read from the beginning if it is not specified. |
rowGroupNum | An integer indicating the number of row groups to be read. The file will be read from the beginning if it is not specified. |
Getting Started
Loading to Memory and Importing to Database
LikeloadText
, loadParquet
imports data as
in-memory tables.
The following example demonstrates how to import demo1.parquet.
dataFile = "./parquetFile/demo1.parquet"
tmpTB = parquet::loadParquet(dataFile)
Check the results:
select * from tmpTB;
Output:

Call the schema
function to view the table schema (column names,
data types, etc):
tmpTB.schema().colDefs;
Output:

Once the Parquet file is loaded into memory, the table can be directly inserted
into the database using tableInsert
or
append!
. Ensure that the target database are already created
before proceeding.
Script for creating the database and table:
create database "dfs://demoDB" partitioned by HASH([SYMBOL,25])
create table "dfs://demoDB"."data"(
time TIMESTAMP,
customerId SYMBOL,
temp INT,
amp DOUBLE
)
partitioned by customerId
Then write data into database with the following script:
dfsTable = loadTable("dfs://demoDB", "data")
tableInsert(dfsTable, tmpTB)
Importing to Database Directly
In addition to explicitly using in-memory tables for database import, DolphinDB
provides loadParquetEx
function that combines loading,
cleaning, and storage into one streamlined process. Ensure that the target
database and table are already created before proceeding.
The following example demonstrates how to import demo1.parquet
to table “data” in database “demoDB”. To start with an empty database, use
thetruncate
function to remove existing rows.
truncate(dbUrl="dfs://demoDB", tableName="data")
parquet::loadParquetEx(database("dfs://demoDB"), "data", ["time"], dataFile)
pt = loadTable("dfs://demoDB", "data")
select * from pt
Output:

Handling Column Names and Data Types
Extracting Parquet File Schema
Users can use the extractParquetSchema
function to preview a
Parquet file's schema before data loading, including details like column names,
data types, and the number of columns. It helps users identify potential issues
such as mismatched data types and modify the schema accordingly for proper data
loading.
For example, use extractParquetSchema
to get the schema of the
sample file demo1.parquet:
dataFile = "./parquetFile/demo1.parquet"
parquet::extractParquetSchema(dataFile)
Output:

Specifying Column Names and Types
If the column names or data types automatically inferred by the system are not
expected, we can specify the column names and data types by modifying the schema
table generated by function extractParquetSchema
or creating
the schema table directly.
Specifying Column Names
For example, use the following script to modify schema and rename column “customerId” as “ID”:
dataFile = "./parquetFile/demo1.parquet"
schemaTB = parquet::extractParquetSchema(dataFile)
update schemaTB set name = "ID" where name = "customerId"
Use function loadParquet
to import the Parquet file and specify
schema parameter as the modified schemaTB.
tmpTB = parquet::loadParquet(dataFile,schemaTB)
Call function schema
to view the table schema. Column
“customerId” has been renamed as “ID”.
tmpTB.schema().colDefs;
Output:

Specifying Data Type
The column temp in demo1.parquet is stored as type INT. However, the expected data type for column temp is DOUBLE. To convert the the data type of column temp:
dataFile = "./parquetFile/demo1.parquet"
schemaTB = parquet::extractParquetSchema(dataFile)
update schemaTB set type = "DOUBLE" where name = "temp"
Use function loadParquet
to import the Parquet file and specify
schema parameter as schemaTB.
tmpTB = parquet::loadParquet(dataFile,schemaTB)
Call the schema
function to view the table schema:
tmpTB.schema().colDefs;
The data type of column temp has been converted to DOUBLE.

Import with Specific Columns
The Parquet plugin allows importing specific columns directly. Unlike
loadText
, where the schema parameter defines the
columns to import, loadParquet
utilizes the column
parameter to indicate the indices of the columns to be imported.
For example, to load only the necessary columns - time, customerId, and amp - while excluding the temp column, specify their respective column indices in column:
parquet::loadParquet(dataFile, , [0,1,3])

Data Cleaning and Preprocessing
DolphinDB provides various built-in functions for common data cleaning task, including handling missing values, duplicate values, and outliers, and performing data normalization and standardization. This section focuses on several common scenarios.
Parquet plugin provides two methods for importing data: loadParquet
function and loadParquetEx
function (See Getting Started). While both methods support
basic data cleaning, loadParquetEx
introduces a distinctive
transform parameter that enables data cleaning and preprocessing during
data import. This parameter accepts a unary function that processes the loaded table
and returns a cleaned dataset, which is then directly written to the distributed
database without intermediate memory storage. Our focus will be on practical
applications of the transform parameter in
loadParquetEx
.
Data Type Conversion of Temporal Types
If the date in a Parquet file is stored as a TIMESTAMP type and needs to be
stored as DATETIME when importing into DolphinDB database. In such case, one way
is to use the transform parameter of the loadParquetEx
function to wrap the replaceColumn!
function for data type
conversion.
First, create a distributed database and a table.
create database "dfs://demoDB1" partitioned by HASH([SYMBOL,25])
create table "dfs://demoDB1"."data"(
time DATETIME,
customerId SYMBOL,
temp INT,
amp DOUBLE
)
partitioned by customerId
Define a user-defined function t2d
to encapsulate the data
processing logic, converting the time column to DATETIME type.
def t2d(mutable t){
return t.replaceColumn!(`time,datetime(t.time))
}
Call function loadParquetEx
and assign function
t2d
to parameter transform. The system executes
function t2d
with the imported data, and then saves the result
to the database.
dataFile = "./parquetFile/demo1.parquet"
db = database("dfs://demoDB1")
parquet::loadParquetEx(db,`data,`customerId,dataFile,,,,,t2d);
Check the results:
select top 5 * from loadTable("dfs://demoDB1","data")
Column 'time' is stored as DATETIME type:

Handling Null Values
Efficient handling of missing values is essential when working with real-world
datasets. DolphinDB offers built-in functions like bfill
,
ffill
, interpolate
, and
nullFill
, enabling quick and effective handling of
incomplete data in tables. As parameter transform only takes a function
with one parameter, to assign a DolphinDB built-in function with multiple
parameters to transform, we can use partial
application to convert the function into a function with only one
parameter. To fill null values xxx, the following statement can be passed as
parameter transform:
nullFill!(t,0)
Importing Multiple Parquet Files or Datasets in Parallel
We can use the submitJob
function to import multiple Parquet files
or datasets in parallel.
Define the data import function:
def writeData(dbName, tbName, file){ // write data to database
t = parquet::loadParquet(file)
bfill!(t) // fill missing values
tableInsert(loadTable(dbName, tbName), t)
}
Use the submitJob
function to assign a thread for each dataset,
enabling background batch data import:
dataDir = "./parquetFile/"
filePaths = dataDir+files(dataDir).filename
dbName = "dfs://demoDB_multi"
tbName = "data"
for(file in filePaths){
jobName = "loadParquet"
submitJob(jobName, jobName, writeData, dbName, tbName, file)
}
Importing a Single Large Parquet File
To avoid memory overflow (OOM) when importing large files, use the
parquetDS
function to split the file into smaller chunks for
import. Unlike textChunkDS
, parquetDS
automatically splits file based on its row groups.
First, call parquetDS
to split the sample data. Since the sample
data contains only a single row group, parquetDS
will create only
one data source.
dataFilePath = "./parquetFile/demo1.parquet"
ds = parquet::parquetDS(dataFilePath)
ds.size()
Then, use the mr
function to write the data into the database:
pt = loadTable("dfs://demoDB","data")
mr(ds=ds, mapFunc=append!{pt}, parallel=false)
select * from pt