HDFS

Hadoop Distributed File System (HDFS) is designed for reliable and distributed read/write of large-scale data. The DolphinDB hdfs plugin can read files in Parquet or ORC format from Hadoop HDFS and write them into DolphinDB in-memory tables. Additionally, it also supports saving DolphinDB in-memory tables to HDFS in specific formats.

Prerequisites

Specify the path to the dynamic libraries required for the plugin on Linux. Note that the search path to the shared library must be set before starting the DolphinDB server.

  1. Install the JAVA environment:

    yum install java
    yum install java-1.8.0-openjdk-devel
  2. Locate the libjvm.so file and set the Java version:

    find /usr/-name "libjvm.so" // Locate the JAVA environment
    export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64 // Set to the actual JAVA path
    export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$LD_LIBRARY_PATH // Specify the search path to the shared library to ensure thDolphinDB can find the JVM library
  3. Download and unzip hadoop-3.2.2:

    cd hadoop-3.2.2
    tar -zxvf hadoop-3.2.2.tar.gz
    export HADOOP_PREFIX=/hdd1/DolphinDBPlugin/hadoop-3.2.2 // Set to the actual path
    export CLASSPATH=$($HADOOP_PREFIX/bin/hadoop classpath --glob):$CLASSPATH
    export LD_LIBRARY_PATH=$HADOOP_PREFIX/lib/native:$LD_LIBRARY_PATH

Installation (with installPlugin)

Required server version: DolphinDB 2.00.10 or higher

OS: Linux x86-64 and Linux JIT

Installation Steps:

(1) Use listRemotePlugins to check plugin information in the plugin repository.

Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.

login("admin", "123456")
listRemotePlugins()

(2) Invoke installPlugin for plugin installation

installPlugin("hdfs")

(3) Use loadPlugin to load the plugin before using the plugin methods.

loadPlugin("hdfs")

Method References

connect

Syntax

connect(nameNode, [port], [userName], [kerbTicketCachePath], [keytabPath], [principal], [lifeTime])

Details

Build a connection to HDFS and return a handle. If the connection fails, an exception will be thrown.

Parameters

  • nameNode: A STRING scalar indicating the IP address where the HDFS is located. If HDFS is locally located, use "localhost" or a full cluster address. If nameNode is specified as the full cluster address, the port number will default to the value of the Hadoop cluster configuration item fs.defaultFS so the parameter port is not required.
  • port (optional): An integer indicating the port number of HDFS. The value of the local port is 9000.
  • userName (optional): A STRING scalar indicating the user name for login.
  • kerbTicketCachePath (optional): A STRING scalar indicating the Kerberos path used to connect to HDFS. It corresponds to the value of the Hadoop configuration item hadoop.security.kerberos.ticket.cache.path. If keytabPath, principal, and lifeTime are not specified, this parameter points to the path of the generated tickets. Otherwise, this parameter points to the path where the newly generated tickets are stored.
  • keytabPath (optional): A STRING scalar indicating the path of the keytab files used to authenticate the obtained Kerberos tickets.
  • principal (optional): A STRING scalar indicating the Kerberos principals to be authenticated.
  • lifeTime (optional): A STRING scalar indicating the lifetime of the generated tickets, where "d" stands for days, "h" for hours, "m" for minutes, and "s" for seconds. For example, "4h5m" represents 4 hours and 5 minutes and "1d2s" represents 1 day and 2 seconds. The default value is "1d".

Examples

// Connect to a regular HDFS
conn = hdfs::connect("default",9000);

// Connect to a Kerberos-authenticated HDFS
keytabPath = "/path_to_keytabs/node.keytab"
cachePath = "/path_to_krb5Cache/cache"
principal = "user/example.com@DOLPHINDB.COM"
lifeTime = "1d3h"
connKerb5 = hdfs::connect(`kerb5_url, 9001, , cachePath, keytabPath, principal, lifeTime)

disconnect

Syntax

disconnect(hdfsFS)

Details

Disconnect from the HDFS.

Parameters

  • hdfsFS: The handle returned by method connect.

exists

Syntax

exists(hdfsFS, path)

Details

Determine whether a specified path exists. If it does not exist, an error will be reported.

Parameters

  • hdfsFS: The handle returned by method connect.
  • path: A STRING scalar indicating a path in HDFS.

coHDFS

Syntax

coHDFS(hdfsFS1, src, hdfsFS2, dst)

Details

Copy the files from one HDFS to another. If failed, an error will be reported.

Parameters

  • hdfsFS1: The handle of the source HDFS returned by method connect.
  • src: A STRING scalar indicating the source file path.
  • hdfsFS2: The handle of the target HDFS returned by method connect.
  • dst: A STRING scalar indicating the target file path.

move

Syntax

move(hdfsFS1,src,hdfsFS2,dst)

Details

Move the files from one HDFS to another. If failed, an error will be reported.

Parameters

  • hdfsFS1: The handle of the source HDFS returned by method connect.
  • src: A STRING scalar indicating the source file path.
  • hdfsFS2: The handle of the target HDFS returned by method connect.
  • dst: A STRING scalar indicating the target file path.

delete

Syntax

delete(hdfsFS, path, recursive)

Details

Delete a directory or file. If failed, an error will be reported.

Parameters

  • hdfsFS: The handle returned by method connect.
  • path: A STRING scalar indicating the path of the file or directory to be deleted.
  • recursive: An integer indicating whether to delete directories recursively. If path is a folder path and recursive is non-zero, all files in the specified folder will be deleted recursively. Otherwise, an error will be reported.

rename

Syntax

rename(hdfsFS, oldPath, newPath)

Details

Rename or move the specified files. If failed, an error will be reported.

Parameters

  • hdfsFS: The handle returned by method connect.
  • oldPath: A STRING scalar indicating the path of the file to be renamed.
  • newPath: A STRING scalar indicating the path of the file after renaming.
    • If an existing directory is specified, the source file will be moved to it.
    • If an existing file is specified, or the specified parent directory is missing, an error will be reported.

createDirectory

Syntax

createDirectory(hdfsFS, path)

Details

Create a new folder. If failed, an error will be reported.

Parameters

  • hdfsFS: The handle returned by method connect.
  • path: A STRING scalar indicating the path of the new folder.

chmod

Syntax

chmod(hdfsFS, path, mode)

Details

Modify the permissions of the specified file or directory. If failed, an error will be reported.

Parameters

  • hdfsFS: The handle returned by method connect.
  • path: A STRING scalar indicating the path of the file or directory.
  • mode: An integer indicating the permissions to apply.

getListDirectory

Syntax

getListDirectory(hdfsFS, path)

Details

Return a handle containing all information about the target directory.

Parameters

  • hdfsFS: The handle returned by method connect.
  • path: A STRING scalar indicating the path of the target directory.

listDirectory

Syntax

listDirectory(fileInfo)

Details

List all file information in the target directory.

Parameters

  • fileInfo: The handle returned by method getListDirectory.

freeFileInfo

Syntax

freeFileInfo(fileInfo)

Details

Release the space occupied by directory information.

Parameters

  • fileInfo: The handle returned by method getListDirectory.

readFile

Syntax

readFile(hdfsFS, path, handler)

Details

Read data from the HDFS server. Return a DolphinDB in-memory table that stores the data processed with the handler function.

Parameters

  • hdfsFS: The handle returned by method connect.

  • path: A STRING scalar indicating the path of the file to be read.

  • handler: A binary function used to deserialize the file from HDFS into a DolphinDB table. The first parameter is the buffer address of the file’s byte stream and the second parameter is the file length. After reading files from HDFS, the readFile method stores the content in the buffer and caches the length of the content. Then, the handler function reads the content from the buffer based on the length, deserializes it, and saves it into a DolphinDB in-memory table.

    Note: Currently, handler only supports orc::loadORCHdfs from the orc plugin and parquet::loadParquetHdfs from the parquet plugin. Custom development is required to deserialize files in other formats from HDFS.

Examples

// Install and load the orc plugin
installPlugin("orc");
loadPlugin("orc");

// Use the orc::loadORCHdfs method to read the ORC files from HDFS
re=hdfs::readFile(conn,'/tmp/testFile.orc',orc::loadORCHdfs)

writeFile

Syntax

readFile(hdfsFS, path, tb, handler)

Details

Store a DolphinDB in-memory table in HDFS with a specific format.

Parameters

  • hdfsFS: The handle returned by method connect.
  • path: A STRING scalar indicating the path of the file to be read.
  • tb: The in-memory table to be stored.
  • handler: A unary function used to serialize the DolphinDB in-memory table into a byte stream and save it to HDFS. The parameter of the function is a DolphinDB in-memory table. The handler function returns a vector where the first element is the address of the serialized buffer and the second is the length of the content in the buffer. It is the inverse operation of the handler function in method readFile. The writeFile method calls handler to serialize tb and obtain the buffer address and length, then writes the content in the buffer to HDFS.

Examples

// Install and load the orc plugin
installPlugin("parquet")
loadPlugin("parquet")

// Use the parquet::saveParquetHdfs method to write the DolphinDB in-meory table to the specified HDFS path in Parquet format
hdfs::writeFile(conn,'/tmp/testFile.parquet',re,parquet::saveParquetHdfs)

Usage Examples

// Load the hdfs plugin
loadPlugin("hdfs")

// Connect to the HDFS server
fs=hdfs::connect("default",9000);

// Check is the specified path exists
hdfs::exists(fs,"/user/name");
hdfs::exists(fs,"/user/name1");

// Copy files for backup
hdfs::coHDFS(fs,"/tmp/testFile.txt",fs,"/tmp/testFile.txt.bk");
hdfs::coHDFS(fs,"/tmp/testFile1.txt",fs,"/tmp/testFile.txt.bk");

// Move files
hdfs::move(fs,"/tmp/testFile.txt.bk",fs,"/user/name/input/testFile.txt");
hdfs::move(fs,"/user/name/input/testFile.txt",fs,"/user/name1/testFile.txt");

// Rename files
hdfs::rename(fs,"/user/name1/testFile.txt","/user/name1/testFile.txt.rename");

// Create an empty directory
hdfs::createDirectory(fs,"/user/name");

// Modify the permission to 600
hdfs::chmod(fs,"/user/name",600);

// Delete the created directory
hdfs::delete(fs,"/user/name",1);

// Get a handle containing all information about the target directory
fileInfo=hdfs::getListDirectory(fs,"/user/name/input/");

// List all file information in the target directory
hdfs::listDirectory(fileInfo);

// Release the space occupied by directory information
hdfs::freeFileInfo(fileInfo);

// Load the ORC files from HDFS into a DolphinDB in-memory table
loadPlugin("orc")
re=hdfs::readFile(conn,'/tmp/testFile.orc',orc::loadORCHdfs)

// Write a DolphinDB in-memory table to the specified HDFS path in Parquet format
loadPlugin("parquet")
hdfs::writeFile(conn,'/tmp/testFile.parquet',re,parquet::saveParquetHdfs)

// Disconnect from HDFS
hdfs::disconnect(fs);