package io.debezium.connector.postgresql;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.snapshot.SnapshotterWrapper;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.function.BlockingConsumer;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.LoggingContext;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/postgresql/PostgresConnectorTask.class */
public class PostgresConnectorTask extends BaseSourceTask {
    private static final String CONTEXT_NAME = "postgres-connector-task";
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicBoolean running = new AtomicBoolean(false);
    private PostgresTaskContext taskContext;
    private RecordsProducer producer;
    private volatile Long lastCompletelyProcessedLsn;
    private ChangeEventQueue<ChangeEvent> changeEventQueue;

    public void start(Configuration configuration) {
        SnapshotterWrapper snapshotterWrapper;
        if (this.running.get()) {
            return;
        }
        PostgresConnectorConfig postgresConnectorConfig = new PostgresConnectorConfig(configuration);
        PostgresConnection postgresConnection = new PostgresConnection(postgresConnectorConfig.jdbcConfig());
        Throwable th = null;
        try {
            TypeRegistry typeRegistry = postgresConnection.getTypeRegistry();
            Charset databaseCharset = postgresConnection.getDatabaseCharset();
            if (postgresConnection != null) {
                if (0 != 0) {
                    try {
                        postgresConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    postgresConnection.close();
                }
            }
            Snapshotter snapshotter = postgresConnectorConfig.getSnapshotter();
            if (snapshotter == null) {
                this.logger.error("Unable to load snapshotter, if using custom snapshot mode, double check your settings");
                throw new ConnectException("Unable to load snapshotter, if using custom snapshot mode, double check your settings");
            }
            TopicSelector<TableId> create = PostgresTopicSelector.create(postgresConnectorConfig);
            this.taskContext = new PostgresTaskContext(postgresConnectorConfig, new PostgresSchema(postgresConnectorConfig, typeRegistry, databaseCharset, create), create);
            SourceInfo sourceInfo = new SourceInfo(postgresConnectorConfig);
            Map<String, Object> offset = this.context.offsetStorageReader().offset(sourceInfo.partition());
            LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
            SlotState slotState = null;
            try {
                try {
                    PostgresConnection createConnection = this.taskContext.createConnection();
                    Throwable th3 = null;
                    try {
                        try {
                            if (this.logger.isInfoEnabled()) {
                                this.logger.info(createConnection.serverInfo().toString());
                            }
                            slotState = createConnection.getReplicationSlotState(postgresConnectorConfig.slotName(), postgresConnectorConfig.plugin().getPostgresPluginName());
                            if (createConnection != null) {
                                if (0 != 0) {
                                    try {
                                        createConnection.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    createConnection.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th5) {
                        if (createConnection != null) {
                            if (th3 != null) {
                                try {
                                    createConnection.close();
                                } catch (Throwable th6) {
                                    th3.addSuppressed(th6);
                                }
                            } else {
                                createConnection.close();
                            }
                        }
                        throw th5;
                    }
                } catch (Throwable th7) {
                    configureLoggingContext.restore();
                    throw th7;
                }
            } catch (SQLException e) {
                this.logger.warn("unable to load info of replication slot, debezium will try to create the slot");
            }
            if (offset == null) {
                this.logger.info("No previous offset found");
                snapshotterWrapper = new SnapshotterWrapper(snapshotter, postgresConnectorConfig, null, slotState);
            } else {
                this.logger.info("Found previous offset {}", sourceInfo);
                sourceInfo.load(offset);
                snapshotterWrapper = new SnapshotterWrapper(snapshotter, postgresConnectorConfig, sourceInfo.asOffsetState(), slotState);
            }
            createRecordProducer(this.taskContext, sourceInfo, snapshotterWrapper);
            this.changeEventQueue = new ChangeEventQueue.Builder().pollInterval(postgresConnectorConfig.getPollInterval()).maxBatchSize(postgresConnectorConfig.getMaxBatchSize()).maxQueueSize(postgresConnectorConfig.getMaxQueueSize()).loggingContextSupplier(() -> {
                return this.taskContext.configureLoggingContext(CONTEXT_NAME);
            }).build();
            RecordsProducer recordsProducer = this.producer;
            ChangeEventQueue<ChangeEvent> changeEventQueue = this.changeEventQueue;
            changeEventQueue.getClass();
            BlockingConsumer<ChangeEvent> blockingConsumer = (v1) -> {
                r1.enqueue(v1);
            };
            ChangeEventQueue<ChangeEvent> changeEventQueue2 = this.changeEventQueue;
            changeEventQueue2.getClass();
            recordsProducer.start(blockingConsumer, changeEventQueue2::producerFailure);
            this.running.compareAndSet(false, true);
            configureLoggingContext.restore();
        } catch (Throwable th8) {
            if (postgresConnection != null) {
                if (0 != 0) {
                    try {
                        postgresConnection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    postgresConnection.close();
                }
            }
            throw th8;
        }
    }

    private void createRecordProducer(PostgresTaskContext postgresTaskContext, SourceInfo sourceInfo, SnapshotterWrapper snapshotterWrapper) {
        Snapshotter snapshotter = snapshotterWrapper.getSnapshotter();
        if (!snapshotter.shouldSnapshot()) {
            if (!snapshotter.shouldStream()) {
                throw new ConnectException("Snapshotter neither is snapshotting or streaming, invalid!");
            }
            this.logger.info("Not attempting to take a snapshot, immediately starting to stream logical changes...");
            this.producer = new RecordsStreamProducer(postgresTaskContext, sourceInfo);
            return;
        }
        if (snapshotter.shouldStream()) {
            this.logger.info("Taking a new snapshot of the DB and streaming logical changes once the snapshot is finished...");
            this.producer = new RecordsSnapshotProducer(postgresTaskContext, sourceInfo, snapshotterWrapper);
        } else {
            this.logger.info("Taking only a snapshot of the DB without streaming any changes afterwards...");
            this.producer = new RecordsSnapshotProducer(postgresTaskContext, sourceInfo, snapshotterWrapper);
        }
    }

    public void commit() throws InterruptedException {
        if (!this.running.get() || this.lastCompletelyProcessedLsn == null) {
            return;
        }
        this.producer.commit(this.lastCompletelyProcessedLsn.longValue());
    }

    public List<SourceRecord> poll() throws InterruptedException {
        List poll = this.changeEventQueue.poll();
        if (poll.size() > 0) {
            this.lastCompletelyProcessedLsn = ((ChangeEvent) poll.get(poll.size() - 1)).getLastCompletelyProcessedLsn();
        }
        return (List) poll.stream().map((v0) -> {
            return v0.getRecord();
        }).collect(Collectors.toList());
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            this.producer.stop();
        }
    }

    public String version() {
        return Module.version();
    }

    protected Iterable<Field> getAllConfigurationFields() {
        return PostgresConnectorConfig.ALL_FIELDS;
    }
}
