/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.sink;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.sink.DbStructure;
import io.confluent.connect.jdbc.sink.JdbcDbWriter;
import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.sink.TableAlterOrCreateException;
import io.confluent.connect.jdbc.util.LogUtil;
import io.confluent.connect.jdbc.util.Version;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSinkTask
extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(JdbcSinkTask.class);
    ErrantRecordReporter reporter;
    DatabaseDialect dialect;
    JdbcSinkConfig config;
    JdbcDbWriter writer;
    int remainingRetries;
    boolean shouldTrimSensitiveLogs;

    public void start(Map<String, String> props) {
        log.info("Starting JDBC Sink task");
        this.config = new JdbcSinkConfig(props);
        this.initWriter();
        this.remainingRetries = this.config.maxRetries;
        this.shouldTrimSensitiveLogs = this.config.trimSensitiveLogsEnabled;
        try {
            this.reporter = this.context.errantRecordReporter();
        }
        catch (NoClassDefFoundError | NoSuchMethodError e) {
            this.reporter = null;
        }
    }

    void initWriter() {
        log.info("Initializing JDBC writer");
        this.dialect = this.config.dialectName != null && !this.config.dialectName.trim().isEmpty() ? DatabaseDialects.create(this.config.dialectName, this.config) : DatabaseDialects.findBestFor(this.config.connectionUrl, this.config);
        DbStructure dbStructure = new DbStructure(this.dialect);
        log.info("Initializing writer using SQL dialect: {}", (Object)this.dialect.getClass().getSimpleName());
        this.writer = new JdbcDbWriter(this.config, this.dialect, dbStructure);
        log.info("JDBC writer initialized");
    }

    public void put(Collection<SinkRecord> records) {
        if (records.isEmpty()) {
            return;
        }
        SinkRecord first = records.iterator().next();
        int recordsCount = records.size();
        log.debug("Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the database...", new Object[]{recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset()});
        try {
            this.writer.write(records);
        }
        catch (TableAlterOrCreateException tace) {
            if (this.reporter != null) {
                this.unrollAndRetry(records);
            }
            log.error(tace.toString());
            throw tace;
        }
        catch (SQLException sqle) {
            SQLException trimmedException = this.shouldTrimSensitiveLogs ? LogUtil.trimSensitiveData(sqle) : sqle;
            log.warn("Write of {} records failed, remainingRetries={}", new Object[]{records.size(), this.remainingRetries, trimmedException});
            int totalExceptions = 0;
            for (Throwable e : sqle) {
                ++totalExceptions;
            }
            SQLException sqlAllMessagesException = this.getAllMessagesException(sqle);
            if (this.remainingRetries > 0) {
                this.writer.closeQuietly();
                this.initWriter();
                --this.remainingRetries;
                this.context.timeout((long)this.config.retryBackoffMs);
                log.debug(sqlAllMessagesException.toString());
                throw new RetriableException((Throwable)sqlAllMessagesException);
            }
            if (this.reporter != null) {
                this.unrollAndRetry(records);
            }
            log.error("Failing task after exhausting retries; encountered {} exceptions on last write attempt. For complete details on each exception, please enable DEBUG logging.", (Object)totalExceptions);
            int exceptionCount = 1;
            for (Throwable e : trimmedException) {
                log.debug("Exception {}:", (Object)exceptionCount++, (Object)e);
            }
            throw new ConnectException((Throwable)sqlAllMessagesException);
        }
        this.remainingRetries = this.config.maxRetries;
    }

    private void unrollAndRetry(Collection<SinkRecord> records) {
        this.writer.closeQuietly();
        this.initWriter();
        for (SinkRecord record : records) {
            try {
                this.writer.write(Collections.singletonList(record));
            }
            catch (TableAlterOrCreateException tace) {
                log.debug(tace.toString());
                this.reporter.report(record, (Throwable)((Object)tace));
                this.writer.closeQuietly();
            }
            catch (SQLException sqle) {
                SQLException sqlAllMessagesException = this.getAllMessagesException(sqle);
                log.debug(sqlAllMessagesException.toString());
                this.reporter.report(record, (Throwable)sqlAllMessagesException);
                this.writer.closeQuietly();
            }
        }
    }

    private SQLException getAllMessagesException(SQLException sqle) {
        String sqleAllMessages = "Exception chain:" + System.lineSeparator();
        SQLException trimmedException = this.shouldTrimSensitiveLogs ? LogUtil.trimSensitiveData(sqle) : sqle;
        for (Throwable e : trimmedException) {
            sqleAllMessages = sqleAllMessages + e + System.lineSeparator();
        }
        SQLException sqlAllMessagesException = new SQLException(sqleAllMessages);
        sqlAllMessagesException.setNextException(trimmedException);
        return sqlAllMessagesException;
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        log.info("Stopping task");
        try {
            this.writer.closeQuietly();
        }
        finally {
            try {
                if (this.dialect != null) {
                    this.dialect.close();
                }
            }
            catch (Throwable t) {
                log.warn("Error while closing the {} dialect: ", (Object)this.dialect.name(), (Object)t);
            }
            finally {
                this.dialect = null;
            }
        }
    }

    public String version() {
        return Version.getVersion();
    }
}

