/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.sink;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.sink.DbStructure;
import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.sink.RecordValidator;
import io.confluent.connect.jdbc.sink.TableAlterOrCreateException;
import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata;
import io.confluent.connect.jdbc.sink.metadata.SchemaPair;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.TableId;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferedRecords {
    private static final Logger log = LoggerFactory.getLogger(BufferedRecords.class);
    private final TableId tableId;
    private final JdbcSinkConfig config;
    private final DatabaseDialect dbDialect;
    private final DbStructure dbStructure;
    private final Connection connection;
    private List<SinkRecord> records = new ArrayList<SinkRecord>();
    private Schema keySchema;
    private Schema valueSchema;
    private RecordValidator recordValidator;
    private FieldsMetadata fieldsMetadata;
    private PreparedStatement updatePreparedStatement;
    private PreparedStatement deletePreparedStatement;
    private DatabaseDialect.StatementBinder updateStatementBinder;
    private DatabaseDialect.StatementBinder deleteStatementBinder;
    private boolean deletesInBatch = false;

    public BufferedRecords(JdbcSinkConfig config, TableId tableId, DatabaseDialect dbDialect, DbStructure dbStructure, Connection connection) {
        this.tableId = tableId;
        this.config = config;
        this.dbDialect = dbDialect;
        this.dbStructure = dbStructure;
        this.connection = connection;
        this.recordValidator = RecordValidator.create(config);
    }

    public List<SinkRecord> add(SinkRecord record) throws SQLException, TableAlterOrCreateException {
        this.recordValidator.validate(record);
        ArrayList<SinkRecord> flushed = new ArrayList<SinkRecord>();
        boolean schemaChanged = false;
        if (!Objects.equals(this.keySchema, record.keySchema())) {
            this.keySchema = record.keySchema();
            schemaChanged = true;
        }
        if (Objects.isNull(record.valueSchema())) {
            if (this.config.deleteEnabled) {
                this.deletesInBatch = true;
            }
        } else if (Objects.equals(this.valueSchema, record.valueSchema())) {
            if (this.config.deleteEnabled && this.deletesInBatch) {
                flushed.addAll(this.flush());
            }
        } else {
            this.valueSchema = record.valueSchema();
            schemaChanged = true;
        }
        if (schemaChanged || this.updateStatementBinder == null) {
            flushed.addAll(this.flush());
            SchemaPair schemaPair = new SchemaPair(record.keySchema(), record.valueSchema());
            this.fieldsMetadata = FieldsMetadata.extract(this.tableId.tableName(), this.config.pkMode, this.config.pkFields, this.config.fieldsWhitelist, schemaPair);
            this.dbStructure.createOrAmendIfNecessary(this.config, this.connection, this.tableId, this.fieldsMetadata);
            String insertSql = this.getInsertSql();
            String deleteSql = this.getDeleteSql();
            log.debug("{} sql: {} deleteSql: {} meta: {}", new Object[]{this.config.insertMode, insertSql, deleteSql, this.fieldsMetadata});
            this.close();
            this.updatePreparedStatement = this.dbDialect.createPreparedStatement(this.connection, insertSql);
            this.updateStatementBinder = this.dbDialect.statementBinder(this.updatePreparedStatement, this.config.pkMode, schemaPair, this.fieldsMetadata, this.dbStructure.tableDefinition(this.connection, this.tableId), this.config.insertMode);
            if (this.config.deleteEnabled && Objects.nonNull(deleteSql)) {
                this.deletePreparedStatement = this.dbDialect.createPreparedStatement(this.connection, deleteSql);
                this.deleteStatementBinder = this.dbDialect.statementBinder(this.deletePreparedStatement, this.config.pkMode, schemaPair, this.fieldsMetadata, this.dbStructure.tableDefinition(this.connection, this.tableId), this.config.insertMode);
            }
        }
        if (Objects.isNull(record.value()) && this.config.deleteEnabled) {
            this.deletesInBatch = true;
        }
        this.records.add(record);
        if (this.records.size() >= this.config.batchSize) {
            flushed.addAll(this.flush());
        }
        return flushed;
    }

    public List<SinkRecord> flush() throws SQLException {
        if (this.records.isEmpty()) {
            log.debug("Records is empty");
            return new ArrayList<SinkRecord>();
        }
        log.debug("Flushing {} buffered records", (Object)this.records.size());
        for (SinkRecord record : this.records) {
            if (Objects.isNull(record.value()) && Objects.nonNull(this.deleteStatementBinder)) {
                this.deleteStatementBinder.bindRecord(record);
                continue;
            }
            this.updateStatementBinder.bindRecord(record);
        }
        this.executeUpdates();
        this.executeDeletes();
        List<SinkRecord> flushedRecords = this.records;
        this.records = new ArrayList<SinkRecord>();
        this.deletesInBatch = false;
        return flushedRecords;
    }

    private void executeUpdates() throws SQLException {
        String exeBatchTime = this.config.originals().get("ddbsync.exeBatchTime") == null ? null : this.config.originals().get("ddbsync.exeBatchTime").toString();
        long startTime = 0L;
        if ("true".equalsIgnoreCase(exeBatchTime)) {
            startTime = System.currentTimeMillis();
        }
        int[] batchStatus = this.updatePreparedStatement.executeBatch();
        if ("true".equalsIgnoreCase(exeBatchTime) && batchStatus.length > 0) {
            long endTime = System.currentTimeMillis();
            double elapsedTime = (double)(endTime - startTime) / 1000.0;
            log.info("executeBatchTimeTotal [{}] seconds :table [{}] do operation [insert] ,executeBatch [{}] rows ", new Object[]{elapsedTime, this.tableId.catalogName().concat(".").concat(this.tableId.tableName()), batchStatus.length});
        }
        for (int updateCount : batchStatus) {
            if (updateCount != -3) continue;
            throw new BatchUpdateException("Execution failed for part of the batch update", batchStatus);
        }
    }

    private void executeDeletes() throws SQLException {
        if (Objects.nonNull(this.deletePreparedStatement)) {
            String exeBatchTime = this.config.originals().get("ddbsync.exeBatchTime") == null ? null : this.config.originals().get("ddbsync.exeBatchTime").toString();
            long startTime = 0L;
            if ("true".equalsIgnoreCase(exeBatchTime)) {
                startTime = System.currentTimeMillis();
            }
            int[] batchStatus = this.deletePreparedStatement.executeBatch();
            if ("true".equalsIgnoreCase(exeBatchTime) && batchStatus.length > 0) {
                long endTime = System.currentTimeMillis();
                double elapsedTime = (double)(endTime - startTime) / 1000.0;
                log.info("executeDeleteTimeTotal [{}] seconds  :table [{}] do operation [delete] ,executeBatch [{}] rows ", new Object[]{elapsedTime, this.tableId.catalogName().concat(".").concat(this.tableId.tableName()), batchStatus.length});
            }
            for (int updateCount : batchStatus) {
                if (updateCount != -3) continue;
                throw new BatchUpdateException("Execution failed for part of the batch delete", batchStatus);
            }
        }
    }

    public void close() throws SQLException {
        log.debug("Closing BufferedRecords with updatePreparedStatement: {} deletePreparedStatement: {}", (Object)this.updatePreparedStatement, (Object)this.deletePreparedStatement);
        if (Objects.nonNull(this.updatePreparedStatement)) {
            this.updatePreparedStatement.close();
            this.updatePreparedStatement = null;
        }
        if (Objects.nonNull(this.deletePreparedStatement)) {
            this.deletePreparedStatement.close();
            this.deletePreparedStatement = null;
        }
    }

    private String getInsertSql() throws SQLException {
        switch (this.config.insertMode) {
            case INSERT: {
                return this.dbDialect.buildInsertStatement(this.tableId, this.asColumns(this.fieldsMetadata.keyFieldNames), this.asColumns(this.fieldsMetadata.nonKeyFieldNames), this.dbStructure.tableDefinition(this.connection, this.tableId));
            }
            case UPSERT: {
                if (this.fieldsMetadata.keyFieldNames.isEmpty()) {
                    throw new ConnectException(String.format("Write to table '%s' in UPSERT mode requires key field names to be known, check the primary key configuration", this.tableId));
                }
                try {
                    return this.dbDialect.buildUpsertQueryStatement(this.tableId, this.asColumns(this.fieldsMetadata.keyFieldNames), this.asColumns(this.fieldsMetadata.nonKeyFieldNames), this.dbStructure.tableDefinition(this.connection, this.tableId));
                }
                catch (UnsupportedOperationException e) {
                    throw new ConnectException(String.format("Write to table '%s' in UPSERT mode is not supported with the %s dialect.", this.tableId, this.dbDialect.name()));
                }
            }
            case UPDATE: {
                return this.dbDialect.buildUpdateStatement(this.tableId, this.asColumns(this.fieldsMetadata.keyFieldNames), this.asColumns(this.fieldsMetadata.nonKeyFieldNames), this.dbStructure.tableDefinition(this.connection, this.tableId));
            }
        }
        throw new ConnectException("Invalid insert mode");
    }

    private String getDeleteSql() {
        String sql = null;
        if (this.config.deleteEnabled) {
            switch (this.config.pkMode) {
                case RECORD_KEY: {
                    if (this.fieldsMetadata.keyFieldNames.isEmpty()) {
                        throw new ConnectException("Require primary keys to support delete");
                    }
                    try {
                        sql = this.dbDialect.buildDeleteStatement(this.tableId, this.asColumns(this.fieldsMetadata.keyFieldNames));
                        break;
                    }
                    catch (UnsupportedOperationException e) {
                        throw new ConnectException(String.format("Deletes to table '%s' are not supported with the %s dialect.", this.tableId, this.dbDialect.name()));
                    }
                }
                default: {
                    throw new ConnectException("Deletes are only supported for pk.mode record_key");
                }
            }
        }
        return sql;
    }

    private Collection<ColumnId> asColumns(Collection<String> names) {
        return names.stream().map(name -> new ColumnId(this.tableId, (String)name)).collect(Collectors.toList());
    }
}

