/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.dialect;

import com.xxdb.data.BasicDateTime;
import com.xxdb.data.BasicNanoTimestamp;
import com.xxdb.data.BasicTimestamp;
import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider;
import io.confluent.connect.jdbc.dialect.DolphinDBDatabaseDialectImp.DDBEngineType;
import io.confluent.connect.jdbc.dialect.DolphinDBDatabaseDialectImp.DDBPreparedStatementBinder;
import io.confluent.connect.jdbc.dialect.DolphinDBDatabaseDialectImp.DDBSynConfigModel;
import io.confluent.connect.jdbc.dialect.DolphinDBDatabaseDialectImp.DolphinDBDataTypeEnum;
import io.confluent.connect.jdbc.dialect.DolphinDBDatabaseDialectImp.DolphinDBUtils;
import io.confluent.connect.jdbc.dialect.GenericDatabaseDialect;
import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata;
import io.confluent.connect.jdbc.sink.metadata.SchemaPair;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.ExpressionBuilder;
import io.confluent.connect.jdbc.util.IdentifierRules;
import io.confluent.connect.jdbc.util.TableDefinition;
import io.confluent.connect.jdbc.util.TableId;
import io.confluent.connect.jdbc.util.TableType;
import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DolphinDBDatabaseDialect
extends GenericDatabaseDialect {
    private static final Logger log = LoggerFactory.getLogger(DolphinDBDatabaseDialect.class);
    volatile int maxIdentifierLength = 0;
    static final String JSON_TYPE_NAME = "json";
    static final String JSONB_TYPE_NAME = "jsonb";
    static final String UUID_TYPE_NAME = "uuid";
    static final String DEFAULT_SYNCONFIG_DATABASE_NAME = "dfs://ddb_sync_config";
    static final String DEFAULT_SYNCONFIG_TABLE_NAME = "sync_config";
    static final String SYNCONFIG_TABLE_CONNECTOR_NAME = "connector_name";
    static final String SYNCONFIG_TABLE_TOPIC_NAME = "topic_name";
    static final String SYNCONFIG_TABLE_TARGET_DB_NAME = "target_db";
    static final String SYNCONFIG_TABLE_TARGET_TAB_NAME = "target_tab";
    static final String SYNCONFIG_TABLE_ADD_SORTCOL_FLAG = "add_sortcol_flag";
    static final String SYNCONFIG_TABLE_ENGINE_TYPE = "engine_type";
    static final String SYNCONFIG_TABLE_PRIMARY_KEY = "primary_key";
    static final String DUMMY_SORT_KEY = "dummySortKey__";
    static final String TAB_SORT_KEY_ADD_FALG = "1";
    static final String LOAD_TAB_STR = "loadTabStr";
    private final Map<String, DDBSynConfigModel> synConfigMap = new HashMap<String, DDBSynConfigModel>();
    private static final Set<String> CAST_TYPES = Collections.unmodifiableSet(Utils.mkSet((Object[])new String[]{"json", "jsonb", "uuid"}));

    public DolphinDBDatabaseDialect(AbstractConfig config) {
        super(config, new IdentifierRules(".", " ", " "));
        try {
            this.initDDBMappingConfig();
        }
        catch (SQLException e) {
            throw new RuntimeException(e);
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Connection getConnection() throws SQLException {
        String configuredEngineTypes;
        try {
            Class.forName("com.dolphindb.jdbc.Driver");
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException("Load DolphinDB jdbc Driver error" + e);
        }
        Connection conn = super.getConnection();
        String string = configuredEngineTypes = this.config.originals().get("ddbsync.config.engineTypes") == null ? null : this.config.originals().get("ddbsync.config.engineTypes").toString();
        if (this.getSupportedEngineTypes(configuredEngineTypes).contains("OLAP")) {
            try {
                String ddbInitDosSql = this.getDDBInitDosSql();
                DolphinDBUtils.execute(conn, ddbInitDosSql);
            }
            catch (SQLException e) {
                throw new RuntimeException("Failed to register function upertPro,The DolphinDB version does not support the OLAP engine");
            }
        }
        return conn;
    }

    protected String getDDBInitDosSql() {
        String lineSeparator = System.lineSeparator();
        StringBuilder sb = new StringBuilder();
        sb.append("def upsertPro(mutable obj, newData, ignoreNull=false, keyColNames = NULL, sortColumns=NULL, deduplicateNewData = false){").append(lineSeparator);
        sb.append("storeEngine = schema(obj).engineType").append(lineSeparator);
        sb.append("if(storeEngine == 'OLAP'){").append(lineSeparator);
        sb.append("deduplicateData= < select * from newData where isDuplicated(_$$keyColNames, LAST) == 0 >.eval()").append(lineSeparator);
        sb.append(" upsertNumber = count(deduplicateData)").append(lineSeparator);
        sb.append("finalNumber = tableUpsert(obj, deduplicateData, ignoreNull ,keyColNames)").append(lineSeparator);
        sb.append("originalNumber = count(newData)").append(lineSeparator);
        sb.append("return originalNumber-(upsertNumber-sum(finalNumber))").append(lineSeparator);
        sb.append("}else if (storeEngine == 'TSDB'){").append(lineSeparator);
        sb.append(" return tableInsert(obj, newData)").append(lineSeparator);
        sb.append("}}").append(lineSeparator);
        sb.append("upsertProFuncNum_=exec count(*) from getFunctionViews() where name='upsertPro'").append(lineSeparator);
        sb.append("if(upsertProFuncNum_!=1){").append(lineSeparator);
        sb.append("addFunctionView(upsertPro)").append(lineSeparator);
        sb.append("}");
        return sb.toString();
    }

    @Override
    protected Properties addConnectionProperties(Properties properties) {
        Set configKeys = this.config.values().keySet();
        this.config.originalsWithPrefix("connection.").forEach((k, v) -> {
            if (!configKeys.contains("connection." + k)) {
                properties.put(k, v);
            }
        });
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, DDBSynConfigModel> entry : this.synConfigMap.entrySet()) {
            DDBSynConfigModel ddbSynConfigModel = entry.getValue();
            sb.append(ddbSynConfigModel.getTabAlias().concat(":").concat(ddbSynConfigModel.getFullNameOfDfsTab()));
            sb.append(",");
        }
        if (sb.length() > 0 && sb.toString().endsWith(",")) {
            sb.delete(sb.length() - 1, sb.length());
            properties.put("tableAlias", sb.toString());
        }
        return properties;
    }

    private void initDDBMappingConfig() throws ClassNotFoundException, SQLException {
        String username = this.config.getString("connection.user");
        Password dbPassword = this.config.getPassword("connection.password");
        Properties properties = new Properties();
        if (username != null) {
            properties.setProperty("user", username);
        }
        if (dbPassword != null) {
            properties.setProperty("password", dbPassword.value());
        }
        properties = this.addConnectionProperties(properties);
        DriverManager.setLoginTimeout(40);
        Class.forName("com.dolphindb.jdbc.Driver");
        String configuredEngineTypes = this.config.originals().get("ddbsync.config.engineTypes") == null ? null : this.config.originals().get("ddbsync.config.engineTypes").toString();
        log.info("configured engineType is " + configuredEngineTypes);
        HashSet<String> supportedEngineTypes = this.getSupportedEngineTypes(configuredEngineTypes);
        log.info("supported engineType is " + supportedEngineTypes.stream().collect(Collectors.joining(",")));
        String[] ddb_sync_config = this.parseDdbSyncConfig();
        try (Connection connection = DriverManager.getConnection(this.jdbcUrl, properties);){
            this.checkTopicMultiConfig(connection, ddb_sync_config);
            this.loadDDBSynConfig(connection, ddb_sync_config);
            this.checkDDBSynConfig();
        }
    }

    private HashSet<String> getSupportedEngineTypes(String configuredEngineTypes) {
        HashSet<String> result = new HashSet<String>();
        result.add("TSDB");
        if (configuredEngineTypes == null) {
            return result;
        }
        HashSet collect = Arrays.stream(configuredEngineTypes.split(",")).map(String::toUpperCase).map(String::trim).collect(Collectors.toCollection(HashSet::new));
        for (DDBEngineType type : DDBEngineType.values()) {
            if (!collect.contains(type.name())) continue;
            result.add(type.name());
        }
        return result;
    }

    public void checkTopicMultiConfig(Connection connection, String[] ddb_sync_config) throws SQLException {
        String connector_name = ((JdbcSinkConfig)this.config).connectorName;
        StringBuilder sb = new StringBuilder();
        sb.append("select count(*) from loadTable(\"");
        sb.append(ddb_sync_config[0]).append("\", `");
        sb.append(ddb_sync_config[1]).append(") ");
        sb.append("where ").append(SYNCONFIG_TABLE_CONNECTOR_NAME).append("=");
        sb.append("'").append(connector_name).append("' ");
        sb.append("group by topic_name having count(*)>1");
        List<Map<String, Object>> mappingInfoList = DolphinDBUtils.queryForList(connection, sb.toString());
        if (mappingInfoList.size() > 0) {
            log.error("dolphindb sync tab config error,There are multiple mapping for a given topic.");
            for (Map<String, Object> map : mappingInfoList) {
                log.error("connnector_name [{}] -- topic [{}] have [{}] mappings", new Object[]{connector_name, map.get(SYNCONFIG_TABLE_TOPIC_NAME), map.get("count")});
            }
            throw new RuntimeException("dolphindb sync tab config error,There are multiple mapping for a given topic.");
        }
    }

    public void loadDDBSynConfig(Connection connection, String[] ddb_sync_config) throws SQLException {
        log.info("start to load the sync table infomation");
        String connector_name = ((JdbcSinkConfig)this.config).connectorName;
        String lineSeparator = System.lineSeparator();
        String suffix = UUID.randomUUID().toString().replace("-", "");
        String tmpVectorName = "tmp_vector_" + suffix;
        String tmpTabName = "tmp_tab_" + suffix;
        String ddbsync_topics = this.config.originals().get("topics") == null ? null : this.config.originals().get("topics").toString();
        String base_query_sql = "select * from loadTable('" + ddb_sync_config[0] + "','" + ddb_sync_config[1] + "') " + "where " + SYNCONFIG_TABLE_CONNECTOR_NAME + "=" + "'" + connector_name + "' ";
        StringBuilder sb = new StringBuilder();
        sb.append(tmpVectorName).append("=[`").append(ddbsync_topics.replaceAll(",", ",`")).append("] ").append(lineSeparator);
        sb.append(tmpTabName).append("=").append(base_query_sql).append("and topic_name in ").append(tmpVectorName).append(lineSeparator);
        sb.append(tmpTabName).append("['").append(SYNCONFIG_TABLE_ENGINE_TYPE).append("']=''").append(lineSeparator);
        sb.append("for (db_name in ").append(tmpTabName).append("['target_db']){").append(lineSeparator);
        sb.append("update ").append(tmpTabName).append(" set engine_type = database(db_name).schema().engineType where target_db=db_name").append(lineSeparator);
        sb.append("}").append(lineSeparator);
        sb.append("select * from ").append(tmpTabName);
        List<Map<String, Object>> mappingInfoList = DolphinDBUtils.queryForList(connection, sb.toString());
        for (Map<String, Object> map : mappingInfoList) {
            DDBSynConfigModel ddbMappingConfigModel = new DDBSynConfigModel();
            ddbMappingConfigModel.setConnectorName(map.get(SYNCONFIG_TABLE_CONNECTOR_NAME) == null ? null : map.get(SYNCONFIG_TABLE_CONNECTOR_NAME).toString());
            ddbMappingConfigModel.setTopicName(map.get(SYNCONFIG_TABLE_TOPIC_NAME) == null ? null : map.get(SYNCONFIG_TABLE_TOPIC_NAME).toString());
            ddbMappingConfigModel.setTargetDB(map.get(SYNCONFIG_TABLE_TARGET_DB_NAME) == null ? null : map.get(SYNCONFIG_TABLE_TARGET_DB_NAME).toString());
            ddbMappingConfigModel.setTargetTab(map.get(SYNCONFIG_TABLE_TARGET_TAB_NAME) == null ? null : map.get(SYNCONFIG_TABLE_TARGET_TAB_NAME).toString());
            ddbMappingConfigModel.setAddSortColFlag(map.get(SYNCONFIG_TABLE_ADD_SORTCOL_FLAG) == null ? null : map.get(SYNCONFIG_TABLE_ADD_SORTCOL_FLAG).toString());
            ddbMappingConfigModel.setEngineType(map.get(SYNCONFIG_TABLE_ENGINE_TYPE) == null ? null : map.get(SYNCONFIG_TABLE_ENGINE_TYPE).toString());
            ddbMappingConfigModel.setPrimayKey(map.get(SYNCONFIG_TABLE_PRIMARY_KEY) == null ? null : map.get(SYNCONFIG_TABLE_PRIMARY_KEY).toString());
            this.synConfigMap.put(map.get(SYNCONFIG_TABLE_TOPIC_NAME) == null ? null : map.get(SYNCONFIG_TABLE_TOPIC_NAME).toString(), ddbMappingConfigModel);
            String configuredEngineTypes = this.config.originals().get("ddbsync.config.engineTypes") == null ? null : this.config.originals().get("ddbsync.config.engineTypes").toString();
            HashSet<String> supportedEngineTypes = this.getSupportedEngineTypes(configuredEngineTypes);
            if (supportedEngineTypes.contains(ddbMappingConfigModel.getEngineType())) continue;
            log.error("The current synchronization table contains unsupported engines " + ddbMappingConfigModel.getEngineType());
            throw new RuntimeException("synchronization table contains unsupported engines");
        }
        log.info("finish loading the sync table infomation");
    }

    public void checkDDBSynConfig() throws SQLException {
        String connector_name = ((JdbcSinkConfig)this.config).connectorName;
        boolean checkFlag = true;
        for (Map.Entry<String, DDBSynConfigModel> entry : this.synConfigMap.entrySet()) {
            String key = entry.getKey();
            DDBSynConfigModel ddbSynConfigModel = entry.getValue();
            if (!"OLAP".equalsIgnoreCase(ddbSynConfigModel.getEngineType()) || ddbSynConfigModel.getPrimayKey() != null) continue;
            log.error("connnector_name [{}] -- topic [{}] config database [{}] engineType is OLAP,so must be configured with primay key", new Object[]{connector_name, entry.getKey(), ddbSynConfigModel.getTargetDB()});
            checkFlag = false;
        }
        if (!checkFlag) {
            throw new RuntimeException("some OLAP engineType Databases are configured without primay key ,please check.");
        }
    }

    @Override
    public TableId parseTableIdentifier(String fqn) {
        TableId result = super.parseTableIdentifier(fqn);
        String topicName = this.mergeTopicName(result);
        if (this.synConfigMap.get(topicName) == null) {
            String checkMsg = String.format("topic: [%s] does not have a corresponding table configured\uff0cpleas check the config,or the sink connector name.", topicName);
            throw new RuntimeException(checkMsg);
        }
        TableId ddbTableId = new TableId(this.synConfigMap.get(topicName).getTargetDB(), null, this.synConfigMap.get(topicName).getTargetTab());
        return ddbTableId;
    }

    @Override
    public boolean tableExists(Connection connection, TableId tableId) throws SQLException {
        boolean exists = false;
        StringBuilder sb = new StringBuilder();
        sb.append("existsTable(");
        sb.append("'").append(tableId.catalogName()).append("',");
        sb.append("'").append(tableId.tableName()).append("'");
        sb.append(")");
        List list = DolphinDBUtils.queryForVector(connection, sb.toString());
        if (!list.get(0).toString().equalsIgnoreCase("true")) {
            String errorMsg = "The distribute table  [" + tableId.catalogName() + "]" + tableId.tableName() + " does not exist ";
            log.error(errorMsg);
            throw new ConnectException(errorMsg);
        }
        exists = true;
        return exists;
    }

    @Override
    public TableDefinition describeTable(Connection connection, TableId tableId) throws SQLException {
        Map<ColumnId, ColumnDefinition> columnDefns = this.describeColumns(connection, tableId);
        if (columnDefns.isEmpty()) {
            return null;
        }
        TableType tableType = TableType.TABLE;
        return new TableDefinition(tableId, columnDefns.values(), tableType);
    }

    @Override
    public Map<ColumnId, ColumnDefinition> describeColumns(Connection connection, String tablePattern, String columnPattern) throws SQLException {
        TableId tableId = this.parseTableIdentifier(tablePattern);
        String catalog = tableId.catalogName() != null ? tableId.catalogName() : this.catalogPattern;
        String schema = tableId.schemaName() != null ? tableId.schemaName() : this.schemaPattern;
        return this.describeColumns(connection, catalog, schema, tableId.tableName(), columnPattern);
    }

    public Map<ColumnId, ColumnDefinition> describeColumns(Connection connection, TableId tableId) throws SQLException {
        log.debug("Querying {} dialect column metadata for catalog:{} schema:{} table:{}", new Object[]{this, tableId.catalogName(), tableId.schemaName(), tableId.tableName()});
        Set<ColumnId> pkColumns = this.getPkColumns(connection, tableId);
        HashMap<ColumnId, ColumnDefinition> results = new HashMap<ColumnId, ColumnDefinition>();
        StringBuilder sb = new StringBuilder();
        sb.append("select * from ");
        sb.append(this.mergeLoadStr(tableId)).append(".schema().colDefs");
        List<Map<String, Object>> colList = DolphinDBUtils.queryForList(this.getConnection(), sb.toString());
        int colPosition = 1;
        for (Map<String, Object> col : colList) {
            ColumnId columnId = new ColumnId(tableId, col.get("name").toString(), null);
            boolean jdbcType = false;
            String typeName = col.get("typeString").toString();
            String typeClassName = null;
            ColumnDefinition.Nullability nullability = ColumnDefinition.Nullability.NULL;
            Boolean autoIncremented = Boolean.FALSE;
            boolean precision = false;
            boolean scale = false;
            Integer displaySize = colPosition;
            Boolean signedNumbers = Boolean.FALSE;
            Boolean caseSensitive = Boolean.FALSE;
            Boolean searchable = Boolean.FALSE;
            Boolean currency = Boolean.FALSE;
            boolean isPrimaryKey = pkColumns.contains(columnId);
            ColumnDefinition defn = new ColumnDefinition(columnId, 0, typeName, typeClassName, nullability, ColumnDefinition.Mutability.UNKNOWN, 0, 0, signedNumbers, displaySize, autoIncremented, caseSensitive, searchable, currency, isPrimaryKey);
            results.put(columnId, defn);
            ++colPosition;
        }
        return results;
    }

    protected String[] parseDdbSyncConfig() {
        String ddb_sync_config_str;
        String[] datasyn_config_array = new String[2];
        String string = ddb_sync_config_str = this.config.originals().get("ddbsync.config.table") == null ? null : this.config.originals().get("ddbsync.config.table").toString();
        if (ddb_sync_config_str.indexOf(",") < 0 || ddb_sync_config_str.split(",").length > 2) {
            log.warn("The parameter [ddbsync.config.table] format error, no comma delimiter found or more than one delimiter found. Default configuration will be used. Will load the data sync config from [{},{}]", (Object)DEFAULT_SYNCONFIG_DATABASE_NAME, (Object)DEFAULT_SYNCONFIG_TABLE_NAME);
            datasyn_config_array[0] = DEFAULT_SYNCONFIG_DATABASE_NAME;
            datasyn_config_array[1] = DEFAULT_SYNCONFIG_TABLE_NAME;
        } else {
            datasyn_config_array = ddb_sync_config_str.split(",");
        }
        return datasyn_config_array;
    }

    protected String mergeTopicName(TableId tableId) {
        String topicName = tableId.catalogName() + "." + tableId.schemaName() + "." + tableId.tableName();
        return topicName;
    }

    protected String mergeTopicName(String catalogPattern, String schemaPattern, String tablePattern) {
        String topicName = catalogPattern + "." + schemaPattern + "." + tablePattern;
        return topicName;
    }

    protected String mergeLoadStr(TableId tableId) {
        StringBuilder sb = new StringBuilder();
        sb.append("loadTable(");
        sb.append("'").append(tableId.catalogName()).append("',");
        sb.append("'").append(tableId.tableName()).append("')");
        return sb.toString();
    }

    protected String mergeLoadStrMemTab(TableId tableId) {
        StringBuilder sb = new StringBuilder();
        sb.append(tableId.catalogName().replace("dfs://", "")).append("_");
        sb.append(tableId.tableName());
        return sb.toString();
    }

    protected Set<ColumnId> sortColumns(Connection connection, TableId tableId) throws SQLException {
        StringBuilder sb = new StringBuilder();
        sb.append(this.mergeLoadStr(tableId));
        sb.append(".schema().sortColumns");
        List sortColList = DolphinDBUtils.queryForVector(connection, sb.toString());
        HashSet<ColumnId> sortColumns = new HashSet<ColumnId>();
        for (Object col : sortColList) {
            String colName = col.toString();
            ColumnId columnId = new ColumnId(tableId, colName);
            sortColumns.add(columnId);
        }
        return sortColumns;
    }

    protected Set<ColumnId> getPkColumns(Connection connection, TableId tableId) throws SQLException {
        DDBSynConfigModel tabSynConfigModel = null;
        for (DDBSynConfigModel dDBSynConfigModel : this.synConfigMap.values()) {
            if (!tableId.catalogName().equalsIgnoreCase(dDBSynConfigModel.getTargetDB()) || !tableId.tableName().equalsIgnoreCase(dDBSynConfigModel.getTargetTab())) continue;
            tabSynConfigModel = dDBSynConfigModel;
            break;
        }
        List pkColList = null;
        switch (tabSynConfigModel.getEngineType()) {
            case "OLAP": {
                pkColList = Arrays.asList(tabSynConfigModel.getPrimayKey().split(","));
                break;
            }
            case "TSDB": {
                StringBuilder sb = new StringBuilder();
                sb.append(this.mergeLoadStr(tableId));
                sb.append(".schema().sortColumns");
                pkColList = DolphinDBUtils.queryForVector(connection, sb.toString());
                break;
            }
            case "PK": {
                break;
            }
            default: {
                throw new RuntimeException("get database engineType failed,please check");
            }
        }
        HashSet<ColumnId> hashSet = new HashSet<ColumnId>();
        for (String col : pkColList) {
            String colName = col.toString();
            ColumnId columnId = new ColumnId(tableId, colName);
            hashSet.add(columnId);
        }
        return hashSet;
    }

    @Override
    protected String getSqlType(SinkRecordField field) {
        if (field.schemaName() != null) {
            switch (field.schemaName()) {
                case "org.apache.kafka.connect.data.Decimal": {
                    return "DECIMAL";
                }
                case "org.apache.kafka.connect.data.Date": {
                    return "DATE";
                }
                case "org.apache.kafka.connect.data.Time": {
                    return "TIME";
                }
                case "org.apache.kafka.connect.data.Timestamp": {
                    return "TIMESTAMP";
                }
            }
        }
        switch (field.schemaType()) {
            case INT8: 
            case INT16: {
                return "SMALLINT";
            }
            case INT32: {
                return "INT";
            }
            case INT64: {
                return "BIGINT";
            }
            case FLOAT32: {
                return "REAL";
            }
            case FLOAT64: {
                return "DOUBLE PRECISION";
            }
            case BOOLEAN: {
                return "BOOLEAN";
            }
            case STRING: {
                return "TEXT";
            }
            case BYTES: {
                return "BYTEA";
            }
            case ARRAY: {
                SinkRecordField childField = new SinkRecordField(field.schema().valueSchema(), field.name(), field.isPrimaryKey());
                return this.getSqlType(childField) + "[]";
            }
        }
        return super.getSqlType(field);
    }

    @Override
    public String buildInsertStatement(TableId tableid, Collection<ColumnId> keyColumns, Collection<ColumnId> nonKeyColumns, TableDefinition definition) {
        String addSortColFlag = this.config.originals().get("ddbsync.addSortColFlag") == null ? null : this.config.originals().get("ddbsync.addSortColFlag").toString();
        Boolean addTabSortColFlag = DolphinDBUtils.checkTabAddSortFlag(this.synConfigMap, TAB_SORT_KEY_ADD_FALG, tableid.catalogName(), tableid.tableName());
        ExpressionBuilder builder = this.expressionBuilder();
        builder.append("INSERT INTO ");
        builder.append(this.mergeLoadStrMemTab(tableid));
        builder.append(" (");
        builder.appendList().delimitedBy(",").transformedBy(ExpressionBuilder.columnNames()).of(keyColumns, nonKeyColumns);
        if ("true".equalsIgnoreCase(addSortColFlag) && addTabSortColFlag.booleanValue()) {
            builder.append(",").append(DUMMY_SORT_KEY);
        }
        builder.append(") VALUES (");
        builder.appendList().delimitedBy(",").transformedBy(this.columnValueVariables(definition)).of(keyColumns, nonKeyColumns);
        if ("true".equalsIgnoreCase(addSortColFlag) && addTabSortColFlag.booleanValue()) {
            builder.append(",?");
        }
        builder.append(")");
        return builder.toString().replaceAll("`", "");
    }

    @Override
    public String buildDeleteStatement(TableId tableid, Collection<ColumnId> keyColumns) {
        ExpressionBuilder builder = this.expressionBuilder();
        builder.append("DELETE FROM ");
        builder.append(this.mergeLoadStrMemTab(tableid));
        if (!keyColumns.isEmpty()) {
            builder.append(" WHERE ");
            builder.appendList().delimitedBy(" AND ").transformedBy(ExpressionBuilder.columnNamesWith(" = ?")).of(keyColumns);
        }
        return builder.toString().replaceAll("`", "");
    }

    @Override
    public String buildUpdateStatement(TableId table, Collection<ColumnId> keyColumns, Collection<ColumnId> nonKeyColumns, TableDefinition definition) {
        ExpressionBuilder builder = this.expressionBuilder();
        builder.append("UPDATE ");
        builder.append(table);
        builder.append(" SET ");
        builder.appendList().delimitedBy(", ").transformedBy(this.columnNamesWithValueVariables(definition)).of(nonKeyColumns);
        if (!keyColumns.isEmpty()) {
            builder.append(" WHERE ");
            builder.appendList().delimitedBy(" AND ").transformedBy(ExpressionBuilder.columnNamesWith(" = ?")).of(keyColumns);
        }
        return builder.toString();
    }

    @Override
    public String buildUpsertQueryStatement(TableId tableid, Collection<ColumnId> keyColumns, Collection<ColumnId> nonKeyColumns, TableDefinition definition) {
        String addSortColFlag = this.config.originals().get("ddbsync.addSortColFlag") == null ? null : this.config.originals().get("ddbsync.addSortColFlag").toString();
        Boolean addTabSortColFlag = DolphinDBUtils.checkTabAddSortFlag(this.synConfigMap, TAB_SORT_KEY_ADD_FALG, tableid.catalogName(), tableid.tableName());
        ExpressionBuilder builder = this.expressionBuilder();
        builder.append("UPSERT INTO ");
        builder.append(this.mergeLoadStrMemTab(tableid));
        builder.append(" (");
        builder.appendList().delimitedBy(",").transformedBy(ExpressionBuilder.columnNames()).of(keyColumns, nonKeyColumns);
        if ("true".equalsIgnoreCase(addSortColFlag) && addTabSortColFlag.booleanValue()) {
            builder.append(",").append(DUMMY_SORT_KEY);
        }
        builder.append(") VALUES (");
        builder.appendList().delimitedBy(",").transformedBy(this.columnValueVariables(definition)).of(keyColumns, nonKeyColumns);
        if ("true".equalsIgnoreCase(addSortColFlag) && addTabSortColFlag.booleanValue()) {
            builder.append(",?");
        }
        builder.append(") ON duplicate key (");
        builder.appendList().delimitedBy(",").transformedBy(ExpressionBuilder.columnNames()).of(keyColumns);
        builder.append(")");
        return builder.toString().replaceAll("`", "");
    }

    @Override
    public void applyDdlStatements(Connection connection, List<String> statements) throws SQLException {
        try (Statement statement = connection.createStatement();){
            for (String ddlStatement : statements) {
                statement.executeUpdate(ddlStatement);
            }
        }
    }

    @Override
    public DatabaseDialect.StatementBinder statementBinder(PreparedStatement statement, JdbcSinkConfig.PrimaryKeyMode pkMode, SchemaPair schemaPair, FieldsMetadata fieldsMetadata, TableDefinition tableDefinition, JdbcSinkConfig.InsertMode insertMode) {
        return new DDBPreparedStatementBinder(this, statement, pkMode, schemaPair, fieldsMetadata, tableDefinition, insertMode, this.synConfigMap, TAB_SORT_KEY_ADD_FALG, this.config.originals().get("ddbsync.addSortColFlag") == null ? null : this.config.originals().get("ddbsync.addSortColFlag").toString());
    }

    @Override
    public void bindField(PreparedStatement statement, int index, Schema schema, Object value, ColumnDefinition colDef) throws SQLException {
        if (value == null) {
            statement.setObject(index, null);
        } else {
            boolean bound = this.maybeBindLogical(statement, index, schema, value, colDef);
            if (!bound) {
                bound = this.maybeBindPrimitive(statement, index, schema, value, colDef);
            }
            if (!bound) {
                throw new ConnectException("Unsupported source data type: " + schema.type());
            }
        }
    }

    @Override
    protected void formatColumnValue(ExpressionBuilder builder, String schemaName, Map<String, String> schemaParameters, Schema.Type type, Object value) {
        if (schemaName == null && Schema.Type.BOOLEAN.equals((Object)type)) {
            builder.append((Boolean)value != false ? "TRUE" : "FALSE");
        } else {
            super.formatColumnValue(builder, schemaName, schemaParameters, type, value);
        }
    }

    protected boolean maybeBindLogical(PreparedStatement statement, int index, Schema schema, Object value, ColumnDefinition colDef) throws SQLException {
        if (schema.name() != null) {
            boolean bound = this.maybeBindLogical(statement, index, schema, value);
            if (bound) {
                return true;
            }
            switch (schema.name()) {
                case "io.debezium.time.ZonedTime": {
                    return this.maybeBindZonedTimePrimitive(statement, index, schema, value, colDef);
                }
                case "io.debezium.time.ZonedTimestamp": {
                    return this.maybeBindZonedTimeStampPrimitive(statement, index, schema, value, colDef);
                }
                case "io.debezium.time.Time": 
                case "io.debezium.time.MicroTime": {
                    return this.maybeBindTimePrimitive(statement, index, schema, value, colDef);
                }
                case "io.debezium.time.Timestamp": 
                case "io.debezium.time.MicroTimestamp": 
                case "io.debezium.time.NanoTimestamp": 
                case "io.debezium.time.Date": {
                    return this.maybeBindTimestampPrimitive(statement, index, schema, value, colDef);
                }
            }
            return false;
        }
        return false;
    }

    private boolean maybeBindZonedTimePrimitive(PreparedStatement statement, int index, Schema schema, Object value, ColumnDefinition colDef) throws SQLException {
        if (schema.name().equalsIgnoreCase("io.debezium.time.ZonedTime")) {
            switch (DolphinDBDataTypeEnum.fromString(colDef.typeName())) {
                case SECOND: 
                case TIME: 
                case NANOTIME: {
                    statement.setObject(index, OffsetTime.parse((String)value).toLocalTime());
                    return true;
                }
            }
            return super.maybeBindPrimitive(statement, index, schema, value);
        }
        return false;
    }

    private boolean maybeBindZonedTimeStampPrimitive(PreparedStatement statement, int index, Schema schema, Object value, ColumnDefinition colDef) throws SQLException {
        if (schema.name().equalsIgnoreCase("io.debezium.time.ZonedTimestamp")) {
            switch (DolphinDBDataTypeEnum.fromString(colDef.typeName())) {
                case DATE: {
                    statement.setDate(index, Date.valueOf(ZonedDateTime.parse((String)value).toLocalDate()));
                    return true;
                }
                case DATETIME: 
                case TIMESTAMP: 
                case NANOTIMESTAMP: {
                    statement.setObject(index, ZonedDateTime.parse((String)value).toLocalDateTime());
                    return true;
                }
            }
            return super.maybeBindPrimitive(statement, index, schema, value);
        }
        return false;
    }

    private boolean maybeBindTimePrimitive(PreparedStatement statement, int index, Schema schema, Object value, ColumnDefinition colDef) throws SQLException {
        if (schema.name().equalsIgnoreCase("io.debezium.time.MicroTime")) {
            switch (DolphinDBDataTypeEnum.fromString(colDef.typeName())) {
                case SECOND: {
                    statement.setObject(index, LocalTime.ofSecondOfDay(Long.parseLong(this.shiftTime(value, -6))));
                    return true;
                }
                case TIME: 
                case NANOTIME: {
                    statement.setObject(index, LocalTime.ofNanoOfDay(Long.parseLong(this.shiftTime(value, 3))));
                    return true;
                }
            }
            return super.maybeBindPrimitive(statement, index, schema, value);
        }
        if (schema.name().equalsIgnoreCase("io.debezium.time.Time")) {
            switch (DolphinDBDataTypeEnum.fromString(colDef.typeName())) {
                case SECOND: {
                    statement.setObject(index, LocalTime.ofSecondOfDay(Long.parseLong(this.shiftTime(value, -3))));
                    return true;
                }
                case TIME: 
                case NANOTIME: {
                    statement.setObject(index, LocalTime.ofNanoOfDay(Long.parseLong(this.shiftTime(value, 6))));
                    return true;
                }
            }
            return super.maybeBindPrimitive(statement, index, schema, value);
        }
        return false;
    }

    protected boolean maybeBindPrimitive(PreparedStatement statement, int index, Schema schema, Object value, ColumnDefinition colDef) throws SQLException {
        if (colDef == null) {
            return super.maybeBindPrimitive(statement, index, schema, value);
        }
        switch (schema.type()) {
            case STRING: {
                return this.maybeBindStringPrimitive(statement, index, schema, value, colDef);
            }
            case BYTES: {
                if (colDef.type() != 2004) break;
                if (value instanceof ByteBuffer) {
                    statement.setBlob(index, new ByteArrayInputStream(((ByteBuffer)value).array()));
                } else if (value instanceof byte[]) {
                    statement.setBlob(index, new ByteArrayInputStream((byte[])value));
                } else {
                    return super.maybeBindPrimitive(statement, index, schema, value);
                }
                return true;
            }
        }
        return super.maybeBindPrimitive(statement, index, schema, value);
    }

    private boolean maybeBindTimestampPrimitive(PreparedStatement statement, int index, Schema schema, Object value, ColumnDefinition colDef) throws SQLException {
        if (schema.name().equalsIgnoreCase("io.debezium.time.Timestamp")) {
            switch (DolphinDBDataTypeEnum.fromString(colDef.typeName())) {
                case DATE: {
                    statement.setDate(index, new Date(Long.parseLong(value.toString())));
                    return true;
                }
                case DATETIME: {
                    statement.setObject(index, new BasicDateTime((int)Long.parseLong(this.shiftTime(value, -3))));
                    return true;
                }
                case TIMESTAMP: {
                    statement.setObject(index, new BasicTimestamp(Long.parseLong(value.toString())));
                    return true;
                }
                case NANOTIMESTAMP: {
                    statement.setObject(index, new BasicNanoTimestamp(Long.parseLong(this.shiftTime(value, 6))));
                    return true;
                }
            }
            return super.maybeBindPrimitive(statement, index, schema, value);
        }
        if (schema.name().equalsIgnoreCase("io.debezium.time.MicroTimestamp")) {
            switch (DolphinDBDataTypeEnum.fromString(colDef.typeName())) {
                case DATE: {
                    statement.setDate(index, new Date(Long.parseLong(this.shiftTime(value, -3))));
                    return true;
                }
                case DATETIME: {
                    statement.setObject(index, new BasicDateTime((int)Long.parseLong(this.shiftTime(value, -6))));
                    return true;
                }
                case TIMESTAMP: {
                    statement.setObject(index, new BasicTimestamp(Long.parseLong(this.shiftTime(value, -3))));
                    return true;
                }
                case NANOTIMESTAMP: {
                    statement.setObject(index, new BasicNanoTimestamp(Long.parseLong(this.shiftTime(value, 3))));
                    return true;
                }
            }
            return super.maybeBindPrimitive(statement, index, schema, value);
        }
        if (schema.name().equalsIgnoreCase("io.debezium.time.NanoTimestamp")) {
            switch (DolphinDBDataTypeEnum.fromString(colDef.typeName())) {
                case DATE: {
                    statement.setDate(index, new Date(Long.parseLong(this.shiftTime(value, -6))));
                    return true;
                }
                case DATETIME: {
                    statement.setObject(index, new BasicDateTime((int)Long.parseLong(this.shiftTime(value, -9))));
                    return true;
                }
                case TIMESTAMP: {
                    statement.setObject(index, new BasicTimestamp(Long.parseLong(this.shiftTime(value, -6))));
                    return true;
                }
                case NANOTIMESTAMP: {
                    statement.setObject(index, new BasicNanoTimestamp(Long.parseLong(value.toString())));
                    return true;
                }
            }
            return super.maybeBindPrimitive(statement, index, schema, value);
        }
        if (schema.name().equalsIgnoreCase("io.debezium.time.Date")) {
            int daysSinceEpoch = Integer.parseInt(value.toString());
            LocalDate localDate = LocalDate.ofEpochDay(daysSinceEpoch);
            switch (DolphinDBDataTypeEnum.fromString(colDef.typeName())) {
                case DATE: {
                    Date sqlDate = Date.valueOf(localDate);
                    statement.setDate(index, sqlDate);
                    return true;
                }
                case DATETIME: {
                    LocalDateTime utcMidnight = localDate.atStartOfDay(ZoneOffset.UTC).toLocalDateTime();
                    statement.setObject(index, new BasicDateTime(utcMidnight));
                    return true;
                }
                case TIMESTAMP: {
                    long epochMillis = localDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli();
                    statement.setObject(index, new BasicTimestamp(epochMillis));
                    return true;
                }
                case NANOTIMESTAMP: {
                    long nanos = localDate.atStartOfDay(ZoneOffset.UTC).toInstant().getEpochSecond() * 1000000000L;
                    statement.setObject(index, new BasicNanoTimestamp(nanos));
                    return true;
                }
            }
            return super.maybeBindPrimitive(statement, index, schema, value);
        }
        return false;
    }

    private String shiftTime(Object timeObj, int position) {
        if (timeObj != null) {
            if (position > 0) {
                return String.format("%-" + (timeObj.toString().length() + position) + "s", timeObj.toString()).replace(' ', '0');
            }
            return timeObj.toString().substring(0, timeObj.toString().length() + position);
        }
        return null;
    }

    private Timestamp convertDdbTimestamp(Object value, int length) {
        String timestampStr = value.toString();
        if (timestampStr.length() > length) {
            timestampStr = timestampStr.substring(0, length);
        }
        if (timestampStr.length() < length) {
            timestampStr = String.format("%-" + length + "s", timestampStr).replace(' ', '0');
        }
        return new Timestamp(Long.parseLong(timestampStr));
    }

    private boolean maybeBindStringPrimitive(PreparedStatement statement, int index, Schema schema, Object value, ColumnDefinition colDef) throws SQLException {
        switch (DolphinDBDataTypeEnum.fromString(colDef.typeName())) {
            case INT: {
                statement.setInt(index, Integer.valueOf((String)value));
                return true;
            }
            case SHORT: {
                statement.setShort(index, Short.valueOf((String)value));
                return true;
            }
            case LONG: {
                statement.setLong(index, Long.valueOf((String)value));
                return true;
            }
            case FLOAT: {
                statement.setFloat(index, Float.valueOf((String)value).floatValue());
                return true;
            }
            case DOUBLE: {
                statement.setDouble(index, Double.valueOf((String)value));
                return true;
            }
            case DECIMAL32: 
            case DECIMAL64: 
            case DECIMAL128: {
                statement.setObject(index, value);
                return true;
            }
        }
        return super.maybeBindPrimitive(statement, index, schema, value);
    }

    @Override
    protected boolean maybeBindPrimitive(PreparedStatement statement, int index, Schema schema, Object value) throws SQLException {
        switch (schema.type()) {
            case ARRAY: {
                List<Object> valueCollection;
                Class<?> valueClass = value.getClass();
                Object[] newValue = null;
                if (Collection.class.isAssignableFrom(valueClass)) {
                    valueCollection = (List<Object>)value;
                } else if (valueClass.isArray()) {
                    valueCollection = Arrays.asList((Object[])value);
                } else {
                    throw new DataException(String.format("Type '%s' is not supported for Array.", valueClass.getName()));
                }
                switch (schema.valueSchema().type()) {
                    case INT8: {
                        newValue = valueCollection.stream().map(o -> ((Byte)o).shortValue()).toArray(Short[]::new);
                        break;
                    }
                    case INT32: {
                        newValue = valueCollection.toArray(new Integer[0]);
                        break;
                    }
                    case INT16: {
                        newValue = valueCollection.toArray(new Short[0]);
                        break;
                    }
                    case BOOLEAN: {
                        newValue = valueCollection.toArray(new Boolean[0]);
                        break;
                    }
                    case STRING: {
                        newValue = valueCollection.toArray(new String[0]);
                        break;
                    }
                    case FLOAT64: {
                        newValue = valueCollection.toArray(new Double[0]);
                        break;
                    }
                    case FLOAT32: {
                        newValue = valueCollection.toArray(new Float[0]);
                        break;
                    }
                    case INT64: {
                        newValue = valueCollection.toArray(new Long[0]);
                        break;
                    }
                }
                if (newValue == null) break;
                statement.setObject(index, (Object)newValue, 2003);
                return true;
            }
        }
        return super.maybeBindPrimitive(statement, index, schema, value);
    }

    protected ExpressionBuilder.Transform<ColumnId> columnNamesWithValueVariables(TableDefinition defn) {
        return (builder, columnId) -> {
            builder.appendColumnName(columnId.name());
            builder.append(" = ?");
            builder.append(this.valueTypeCast(defn, (ColumnId)columnId));
        };
    }

    protected ExpressionBuilder.Transform<ColumnId> columnValueVariables(TableDefinition defn) {
        return (builder, columnId) -> {
            builder.append("?");
            builder.append(this.valueTypeCast(defn, (ColumnId)columnId));
        };
    }

    protected String valueTypeCast(TableDefinition tableDefn, ColumnId columnId) {
        String typeName;
        ColumnDefinition defn;
        if (tableDefn != null && (defn = tableDefn.definitionForColumn(columnId.name())) != null && (typeName = defn.typeName()) != null && CAST_TYPES.contains(typeName = typeName.toLowerCase())) {
            return "::" + typeName;
        }
        return "";
    }

    public static class Provider
    extends DatabaseDialectProvider.SubprotocolBasedProvider {
        public Provider() {
            super(DolphinDBDatabaseDialect.class.getSimpleName(), "dolphindb");
        }

        @Override
        public DatabaseDialect create(AbstractConfig config) {
            return new DolphinDBDatabaseDialect(config);
        }
    }
}

