Importing Parquet Files

Apache Parquet is a columnar storage format for efficient data storage and retrieval. DolphinDB Parquet plugin supports importing and exporting Parquet files to and from DolphinDB.This page explains how to use the plugin to import Parquet datasets into DolphinDB.

Installing Parquet Plugin

Invoke installPlugin for plugin installation:

installPlugin("parquet")

Use loadPlugin to load the plugin before using the plugin methods.

loadPlugin("parquet")

Parquet Data Import Methods

The DolphinDB parquet plugin provides the following methods to facilitate Parquet file import:

  • loadParquet: Imports Parquet files as in-memory tables.
  • loadParquetEx: Imports Parquet files directly into either a distributed database or in-memory database.
  • parquetDS: Divides a Parquet file into multiple smaller data sources. It can be used with function mr to load data.

The following table outlines common parameters of these methods.

Parameter Description
filename A string indicating the directory of input Parquet file.
schema

A table with the following columns:

  • name: STRING scalar. Column name.
  • type: STRING scalar. Data type.
column A vector of integers indicating the column index to be imported. All columns will be read if it is not specified.
rowGroupStart A non-negative integer indicating the index of the row group from which the data import starts. The file will be read from the beginning if it is not specified.
rowGroupNum An integer indicating the number of row groups to be read. The file will be read from the beginning if it is not specified.

Getting Started

Loading to Memory and Importing to Database

LikeloadText, loadParquet imports data as in-memory tables.

The following example demonstrates how to import demo1.parquet.

dataFile = "./parquetFile/demo1.parquet"
tmpTB = parquet::loadParquet(dataFile)

Check the results:

select * from tmpTB;

Output:

Call the schema function to view the table schema (column names, data types, etc):

tmpTB.schema().colDefs;

Output:

Once the Parquet file is loaded into memory, the table can be directly inserted into the database using tableInsert or append!. Ensure that the target database are already created before proceeding.

Script for creating the database and table:

create database "dfs://demoDB" partitioned by HASH([SYMBOL,25])
create table "dfs://demoDB"."data"(
    time TIMESTAMP,
    customerId SYMBOL,
    temp INT,
    amp DOUBLE
)
partitioned by customerId

Then write data into database with the following script:

dfsTable = loadTable("dfs://demoDB", "data")
tableInsert(dfsTable, tmpTB)

Importing to Database Directly

In addition to explicitly using in-memory tables for database import, DolphinDB provides loadParquetEx function that combines loading, cleaning, and storage into one streamlined process. Ensure that the target database and table are already created before proceeding.

The following example demonstrates how to import demo1.parquet to table “data” in database “demoDB”. To start with an empty database, use thetruncate function to remove existing rows.

truncate(dbUrl="dfs://demoDB", tableName="data") 
parquet::loadParquetEx(database("dfs://demoDB"), "data", ["time"], dataFile) 
pt = loadTable("dfs://demoDB", "data")   
select * from pt

Output:

Handling Column Names and Data Types

Extracting Parquet File Schema

Users can use the extractParquetSchema function to preview a Parquet file's schema before data loading, including details like column names, data types, and the number of columns. It helps users identify potential issues such as mismatched data types and modify the schema accordingly for proper data loading.

For example, use extractParquetSchema to get the schema of the sample file demo1.parquet:

dataFile = "./parquetFile/demo1.parquet"
parquet::extractParquetSchema(dataFile)

Output:

Specifying Column Names and Types

If the column names or data types automatically inferred by the system are not expected, we can specify the column names and data types by modifying the schema table generated by function extractParquetSchema or creating the schema table directly.

Specifying Column Names

For example, use the following script to modify schema and rename column “customerId” as “ID”:

dataFile = "./parquetFile/demo1.parquet"
schemaTB = parquet::extractParquetSchema(dataFile)
update schemaTB set name = "ID" where name = "customerId"

Use function loadParquet to import the Parquet file and specify schema parameter as the modified schemaTB.

tmpTB = parquet::loadParquet(dataFile,schemaTB)

Call function schema to view the table schema. Column “customerId” has been renamed as “ID”.

tmpTB.schema().colDefs;

Output:

Specifying Data Type

The column temp in demo1.parquet is stored as type INT. However, the expected data type for column temp is DOUBLE. To convert the the data type of column temp:

dataFile = "./parquetFile/demo1.parquet"
schemaTB = parquet::extractParquetSchema(dataFile)
update schemaTB set type = "DOUBLE" where name = "temp"

Use function loadParquet to import the Parquet file and specify schema parameter as schemaTB.

tmpTB = parquet::loadParquet(dataFile,schemaTB)

Call the schema function to view the table schema:

tmpTB.schema().colDefs;

The data type of column temp has been converted to DOUBLE.

Import with Specific Columns

The Parquet plugin allows importing specific columns directly. Unlike loadText, where the schema parameter defines the columns to import, loadParquet utilizes the column parameter to indicate the indices of the columns to be imported.

For example, to load only the necessary columns - time, customerId, and amp - while excluding the temp column, specify their respective column indices in column:

parquet::loadParquet(dataFile, , [0,1,3]) 

Data Cleaning and Preprocessing

DolphinDB provides various built-in functions for common data cleaning task, including handling missing values, duplicate values, and outliers, and performing data normalization and standardization. This section focuses on several common scenarios.

Parquet plugin provides two methods for importing data: loadParquet function and loadParquetEx function (See Getting Started). While both methods support basic data cleaning, loadParquetEx introduces a distinctive transform parameter that enables data cleaning and preprocessing during data import. This parameter accepts a unary function that processes the loaded table and returns a cleaned dataset, which is then directly written to the distributed database without intermediate memory storage. Our focus will be on practical applications of the transform parameter in loadParquetEx.

Data Type Conversion of Temporal Types

If the date in a Parquet file is stored as a TIMESTAMP type and needs to be stored as DATETIME when importing into DolphinDB database. In such case, one way is to use the transform parameter of the loadParquetEx function to wrap the replaceColumn! function for data type conversion.

First, create a distributed database and a table.

create database "dfs://demoDB1" partitioned by HASH([SYMBOL,25])
create table "dfs://demoDB1"."data"(
    time DATETIME,
    customerId SYMBOL,
    temp INT,
    amp DOUBLE
)
partitioned by customerId

Define a user-defined function t2d to encapsulate the data processing logic, converting the time column to DATETIME type.

def t2d(mutable t){
    return t.replaceColumn!(`time,datetime(t.time))
}

Call function loadParquetEx and assign function t2d to parameter transform. The system executes function t2d with the imported data, and then saves the result to the database.

dataFile = "./parquetFile/demo1.parquet"
db = database("dfs://demoDB1")
parquet::loadParquetEx(db,`data,`customerId,dataFile,,,,,t2d);

Check the results:

select top 5 * from loadTable("dfs://demoDB1","data")

Column 'time' is stored as DATETIME type:

Handling Null Values

Efficient handling of missing values is essential when working with real-world datasets. DolphinDB offers built-in functions like bfill, ffill, interpolate, and nullFill, enabling quick and effective handling of incomplete data in tables. As parameter transform only takes a function with one parameter, to assign a DolphinDB built-in function with multiple parameters to transform, we can use partial application to convert the function into a function with only one parameter. To fill null values xxx, the following statement can be passed as parameter transform:

nullFill!(t,0)

Importing Multiple Parquet Files or Datasets in Parallel

We can use the submitJob function to import multiple Parquet files or datasets in parallel.

Define the data import function:

def writeData(dbName, tbName, file){  // write data to database
    t = parquet::loadParquet(file) 
    bfill!(t)   // fill missing values 
    tableInsert(loadTable(dbName, tbName), t)  
}

Use the submitJob function to assign a thread for each dataset, enabling background batch data import:

dataDir = "./parquetFile/"
filePaths = dataDir+files(dataDir).filename
dbName = "dfs://demoDB_multi" 
tbName = "data" 
for(file in filePaths){
    jobName = "loadParquet"
    submitJob(jobName, jobName, writeData, dbName, tbName, file) 
}

Importing a Single Large Parquet File

To avoid memory overflow (OOM) when importing large files, use the parquetDS function to split the file into smaller chunks for import. Unlike textChunkDS, parquetDS automatically splits file based on its row groups.

First, call parquetDS to split the sample data. Since the sample data contains only a single row group, parquetDS will create only one data source.

dataFilePath = "./parquetFile/demo1.parquet"
ds = parquet::parquetDS(dataFilePath)
ds.size()

Then, use the mr function to write the data into the database:

pt = loadTable("dfs://demoDB","data")
mr(ds=ds, mapFunc=append!{pt}, parallel=false)
select * from pt