package io.debezium.embedded;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.config.Instantiator;
import io.debezium.embedded.spi.OffsetCommitPolicy;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.StopEngineException;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.debezium.util.VariableLatch;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/debezium/embedded/EmbeddedEngine.class */
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
    public static final Field ENGINE_NAME;
    public static final Field CONNECTOR_CLASS;
    public static final Field OFFSET_STORAGE;
    public static final Field OFFSET_STORAGE_FILE_FILENAME;
    public static final Field OFFSET_STORAGE_KAFKA_TOPIC;
    public static final Field OFFSET_STORAGE_KAFKA_PARTITIONS;
    public static final Field OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR;
    public static final Field OFFSET_FLUSH_INTERVAL_MS;
    public static final Field OFFSET_COMMIT_TIMEOUT_MS;
    public static final Field OFFSET_COMMIT_POLICY;
    public static final Field PREDICATES;
    public static final Field TRANSFORMS;
    private static final int DEFAULT_ERROR_MAX_RETRIES = -1;
    private static final Field ERRORS_MAX_RETRIES;
    private static final Field ERRORS_RETRY_DELAY_INITIAL_MS;
    private static final Field ERRORS_RETRY_DELAY_MAX_MS;
    public static final Field.Set CONNECTOR_FIELDS;
    protected static final Field.Set ALL_FIELDS;
    private static final Duration WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT;
    private static final String WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP = "debezium.embedded.shutdown.pause.before.interrupt.ms";
    private static final Logger LOGGER;
    private final Configuration config;
    private final Clock clock;
    private final ClassLoader classLoader;
    private final DebeziumEngine.ChangeConsumer<SourceRecord> handler;
    private final DebeziumEngine.CompletionCallback completionCallback;
    private final DebeziumEngine.ConnectorCallback connectorCallback;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final WorkerConfig workerConfig;
    private final CompletionResult completionResult;
    private OffsetCommitPolicy offsetCommitPolicy;
    private SourceTask task;
    private final Transformations transformations;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicReference<Thread> runningThread = new AtomicReference<>();
    private final VariableLatch latch = new VariableLatch(0);
    private long recordsSinceLastCommit = 0;
    private long timeOfLastCommitMillis = 0;

    @Deprecated
    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$Builder.class */
    public interface Builder extends DebeziumEngine.Builder<SourceRecord> {
        Builder using(Configuration configuration);

        Builder using(Clock clock);

        Builder notifying(Consumer<SourceRecord> consumer);

        Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> changeConsumer);

        @Override // 
        /* renamed from: using, reason: merged with bridge method [inline-methods] */
        Builder mo3using(ClassLoader classLoader);

        Builder using(CompletionCallback completionCallback);

        Builder using(ConnectorCallback connectorCallback);

        @Override // 
        /* renamed from: using, reason: merged with bridge method [inline-methods] */
        Builder mo2using(OffsetCommitPolicy offsetCommitPolicy);

        @Override // 
        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        EmbeddedEngine mo1build();

        /* renamed from: notifying, reason: collision with other method in class */
        /* bridge */ /* synthetic */ default DebeziumEngine.Builder mo4notifying(DebeziumEngine.ChangeConsumer changeConsumer) {
            return notifying((DebeziumEngine.ChangeConsumer<SourceRecord>) changeConsumer);
        }

        /* renamed from: notifying, reason: collision with other method in class */
        /* bridge */ /* synthetic */ default DebeziumEngine.Builder mo5notifying(Consumer consumer) {
            return notifying((Consumer<SourceRecord>) consumer);
        }
    }

    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$BuilderImpl.class */
    public static final class BuilderImpl implements Builder {
        private Configuration config;
        private DebeziumEngine.ChangeConsumer<SourceRecord> handler;
        private ClassLoader classLoader;
        private Clock clock;
        private DebeziumEngine.CompletionCallback completionCallback;
        private DebeziumEngine.ConnectorCallback connectorCallback;
        private OffsetCommitPolicy offsetCommitPolicy = null;

        @Override // io.debezium.embedded.EmbeddedEngine.Builder
        public Builder using(Configuration configuration) {
            this.config = configuration;
            return this;
        }

        /* renamed from: using, reason: merged with bridge method [inline-methods] */
        public Builder m9using(Properties properties) {
            this.config = Configuration.from(properties);
            return this;
        }

        @Override // io.debezium.embedded.EmbeddedEngine.Builder
        /* renamed from: using */
        public Builder mo3using(ClassLoader classLoader) {
            this.classLoader = classLoader;
            return this;
        }

        @Override // io.debezium.embedded.EmbeddedEngine.Builder
        public Builder using(Clock clock) {
            this.clock = clock;
            return this;
        }

        /* renamed from: using, reason: merged with bridge method [inline-methods] */
        public Builder m7using(DebeziumEngine.CompletionCallback completionCallback) {
            this.completionCallback = completionCallback;
            return this;
        }

        /* renamed from: using, reason: merged with bridge method [inline-methods] */
        public Builder m6using(DebeziumEngine.ConnectorCallback connectorCallback) {
            this.connectorCallback = connectorCallback;
            return this;
        }

        @Override // io.debezium.embedded.EmbeddedEngine.Builder
        /* renamed from: using */
        public Builder mo2using(OffsetCommitPolicy offsetCommitPolicy) {
            this.offsetCommitPolicy = offsetCommitPolicy;
            return this;
        }

        @Override // io.debezium.embedded.EmbeddedEngine.Builder
        public Builder notifying(Consumer<SourceRecord> consumer) {
            this.handler = EmbeddedEngine.buildDefaultChangeConsumer(consumer);
            return this;
        }

        @Override // io.debezium.embedded.EmbeddedEngine.Builder
        public Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> changeConsumer) {
            this.handler = changeConsumer;
            if (!this.config.hasKey(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()) && !changeConsumer.supportsTombstoneEvents()) {
                EmbeddedEngine.LOGGER.info("Consumer doesn't support tombstone events, setting '{}' to false.", CommonConnectorConfig.TOMBSTONES_ON_DELETE.name());
                this.config = this.config.edit().with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).build();
            }
            return this;
        }

        /* renamed from: using, reason: merged with bridge method [inline-methods] */
        public Builder m8using(final java.time.Clock clock) {
            return using(new Clock() { // from class: io.debezium.embedded.EmbeddedEngine.BuilderImpl.1
                public long currentTimeInMillis() {
                    return clock.millis();
                }
            });
        }

        @Override // io.debezium.embedded.EmbeddedEngine.Builder
        /* renamed from: build */
        public EmbeddedEngine mo1build() {
            if (this.classLoader == null) {
                this.classLoader = Instantiator.getClassLoader();
            }
            if (this.clock == null) {
                this.clock = Clock.system();
            }
            Objects.requireNonNull(this.config, "A connector configuration must be specified.");
            Objects.requireNonNull(this.handler, "A connector consumer or changeHandler must be specified.");
            return new EmbeddedEngine(this.config, this.classLoader, this.clock, this.handler, this.completionCallback, this.connectorCallback, this.offsetCommitPolicy);
        }

        @Override // io.debezium.embedded.EmbeddedEngine.Builder
        public Builder using(CompletionCallback completionCallback) {
            return m7using((DebeziumEngine.CompletionCallback) completionCallback);
        }

        @Override // io.debezium.embedded.EmbeddedEngine.Builder
        public Builder using(ConnectorCallback connectorCallback) {
            return m6using((DebeziumEngine.ConnectorCallback) connectorCallback);
        }

        @Override // io.debezium.embedded.EmbeddedEngine.Builder
        /* renamed from: notifying */
        public /* bridge */ /* synthetic */ DebeziumEngine.Builder mo4notifying(DebeziumEngine.ChangeConsumer changeConsumer) {
            return notifying((DebeziumEngine.ChangeConsumer<SourceRecord>) changeConsumer);
        }

        @Override // io.debezium.embedded.EmbeddedEngine.Builder
        /* renamed from: notifying */
        public /* bridge */ /* synthetic */ DebeziumEngine.Builder mo5notifying(Consumer consumer) {
            return notifying((Consumer<SourceRecord>) consumer);
        }
    }

    @Deprecated
    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$ChangeConsumer.class */
    public interface ChangeConsumer extends DebeziumEngine.ChangeConsumer<SourceRecord> {
    }

    @Deprecated
    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$CompletionCallback.class */
    public interface CompletionCallback extends DebeziumEngine.CompletionCallback {
    }

    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$CompletionResult.class */
    public static class CompletionResult implements CompletionCallback {
        private final CompletionCallback delegate;
        private final CountDownLatch completed;
        private boolean success;
        private String message;
        private Throwable error;

        public CompletionResult() {
            this(null);
        }

        public CompletionResult(CompletionCallback completionCallback) {
            this.completed = new CountDownLatch(1);
            this.delegate = completionCallback;
        }

        public void handle(boolean z, String str, Throwable th) {
            this.success = z;
            this.message = str;
            this.error = th;
            this.completed.countDown();
            if (this.delegate != null) {
                this.delegate.handle(z, str, th);
            }
        }

        public void await() throws InterruptedException {
            this.completed.await();
        }

        public boolean await(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.completed.await(j, timeUnit);
        }

        public boolean hasCompleted() {
            return this.completed.getCount() == 0;
        }

        public boolean success() {
            return this.success;
        }

        public String message() {
            return this.message;
        }

        public Throwable error() {
            return this.error;
        }

        public boolean hasError() {
            return this.error != null;
        }
    }

    @Deprecated
    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$ConnectorCallback.class */
    public interface ConnectorCallback extends DebeziumEngine.ConnectorCallback {
    }

    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$EmbeddedConfig.class */
    protected static class EmbeddedConfig extends WorkerConfig {
        private static final ConfigDef CONFIG;

        protected EmbeddedConfig(Map<String, String> map) {
            super(CONFIG, map);
        }

        static {
            ConfigDef baseConfigDef = baseConfigDef();
            Field.group(baseConfigDef, "file", new Field[]{EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME});
            Field.group(baseConfigDef, "kafka", new Field[]{EmbeddedEngine.OFFSET_STORAGE_KAFKA_TOPIC});
            Field.group(baseConfigDef, "kafka", new Field[]{EmbeddedEngine.OFFSET_STORAGE_KAFKA_PARTITIONS});
            Field.group(baseConfigDef, "kafka", new Field[]{EmbeddedEngine.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR});
            CONFIG = baseConfigDef;
        }
    }

    @ThreadSafe
    @Deprecated
    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$RecordCommitter.class */
    public interface RecordCommitter extends DebeziumEngine.RecordCommitter<SourceRecord> {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$SourceRecordOffsets.class */
    public class SourceRecordOffsets implements DebeziumEngine.Offsets {
        private HashMap<String, Object> offsets = new HashMap<>();

        protected SourceRecordOffsets() {
        }

        public void set(String str, Object obj) {
            this.offsets.put(str, obj);
        }

        protected HashMap<String, Object> getOffsets() {
            return this.offsets;
        }
    }

    private static ChangeConsumer buildDefaultChangeConsumer(final Consumer<SourceRecord> consumer) {
        return new ChangeConsumer() { // from class: io.debezium.embedded.EmbeddedEngine.1
            public void handleBatch(List<SourceRecord> list, DebeziumEngine.RecordCommitter<SourceRecord> recordCommitter) throws InterruptedException {
                for (SourceRecord sourceRecord : list) {
                    try {
                        consumer.accept(sourceRecord);
                        recordCommitter.markProcessed(sourceRecord);
                    } catch (StopConnectorException | StopEngineException e) {
                        recordCommitter.markProcessed(sourceRecord);
                        throw e;
                    }
                }
                recordCommitter.markBatchFinished();
            }
        };
    }

    @Deprecated
    public static Builder create() {
        return new BuilderImpl();
    }

    private EmbeddedEngine(Configuration configuration, ClassLoader classLoader, Clock clock, DebeziumEngine.ChangeConsumer<SourceRecord> changeConsumer, DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback, OffsetCommitPolicy offsetCommitPolicy) {
        this.config = configuration;
        this.handler = changeConsumer;
        this.classLoader = classLoader;
        this.clock = clock;
        this.completionCallback = completionCallback != null ? completionCallback : (z, str, th) -> {
            if (z) {
                return;
            }
            LOGGER.error(str, th);
        };
        this.connectorCallback = connectorCallback;
        this.completionResult = new CompletionResult();
        this.offsetCommitPolicy = offsetCommitPolicy;
        if (!$assertionsDisabled && this.config == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.handler == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.classLoader == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.clock == null) {
            throw new AssertionError();
        }
        Map singletonMap = Collections.singletonMap("schemas.enable", "false");
        this.keyConverter = (Converter) Instantiator.getInstance(JsonConverter.class.getName());
        this.keyConverter.configure(singletonMap, true);
        this.valueConverter = (Converter) Instantiator.getInstance(JsonConverter.class.getName());
        this.valueConverter.configure(singletonMap, false);
        this.transformations = new Transformations(configuration);
        Map asMap = configuration.asMap(ALL_FIELDS);
        asMap.put("key.converter", JsonConverter.class.getName());
        asMap.put("value.converter", JsonConverter.class.getName());
        this.workerConfig = new EmbeddedConfig(asMap);
    }

    public boolean isRunning() {
        return this.runningThread.get() != null;
    }

    private void fail(String str) {
        fail(str, null);
    }

    private void fail(String str, Throwable th) {
        if (this.completionResult.hasError()) {
            LOGGER.error(str, th);
        } else {
            this.completionResult.handle(false, str, th);
        }
    }

    private void succeed(String str) {
        this.completionResult.handle(true, str, null);
    }

    /* JADX WARN: Finally extract failed */
    public void run() {
        Configuration configuration;
        Field.Set set;
        Logger logger;
        List taskConfigs;
        Class taskClass;
        if (this.runningThread.compareAndSet(null, Thread.currentThread())) {
            String string = this.config.getString(ENGINE_NAME);
            String string2 = this.config.getString(CONNECTOR_CLASS);
            Optional ofNullable = Optional.ofNullable(this.connectorCallback);
            this.latch.countUp();
            try {
                configuration = this.config;
                set = CONNECTOR_FIELDS;
                logger = LOGGER;
                Objects.requireNonNull(logger);
            } catch (Throwable th) {
                this.latch.countDown();
                this.runningThread.set(null);
                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                throw th;
            }
            if (!configuration.validateAndRecord(set, logger::error)) {
                fail("Failed to start connector with invalid configuration (see logs for actual errors)");
                this.latch.countDown();
                this.runningThread.set(null);
                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                return;
            }
            try {
                SourceConnector sourceConnector = (SourceConnector) this.classLoader.loadClass(string2).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                Map originalsStrings = this.workerConfig.originalsStrings();
                ConfigInfos generateResult = AbstractHerder.generateResult(string2, Collections.emptyMap(), sourceConnector.validate(originalsStrings).configValues(), sourceConnector.config().groups());
                if (generateResult.errorCount() > 0) {
                    fail("Connector configuration is not valid. " + ((String) generateResult.values().stream().flatMap(configInfo -> {
                        return configInfo.configValue().errors().stream();
                    }).collect(Collectors.joining(" "))));
                    this.latch.countDown();
                    this.runningThread.set(null);
                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                    return;
                }
                String string3 = this.config.getString(OFFSET_STORAGE);
                try {
                    OffsetBackingStore offsetBackingStore = (OffsetBackingStore) this.classLoader.loadClass(string3).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                    try {
                        offsetBackingStore.configure(this.workerConfig);
                        offsetBackingStore.start();
                        if (this.offsetCommitPolicy == null) {
                            try {
                                this.offsetCommitPolicy = (OffsetCommitPolicy) Instantiator.getInstanceWithProperties(this.config.getString(OFFSET_COMMIT_POLICY), this.config.asProperties());
                            } catch (Throwable th2) {
                                fail("Unable to instantiate OffsetCommitPolicy class '" + string3 + "'", th2);
                                this.latch.countDown();
                                this.runningThread.set(null);
                                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                return;
                            }
                        }
                        try {
                            final OffsetStorageReaderImpl offsetStorageReaderImpl = new OffsetStorageReaderImpl(offsetBackingStore, string, this.keyConverter, this.valueConverter);
                            sourceConnector.initialize(new SourceConnectorContext() { // from class: io.debezium.embedded.EmbeddedEngine.2
                                public void requestTaskReconfiguration() {
                                }

                                public void raiseError(Exception exc) {
                                    EmbeddedEngine.this.fail(exc.getMessage(), exc);
                                }

                                public OffsetStorageReader offsetStorageReader() {
                                    return offsetStorageReaderImpl;
                                }
                            });
                            OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, string, this.keyConverter, this.valueConverter);
                            Duration ofMillis = Duration.ofMillis(this.config.getLong(OFFSET_COMMIT_TIMEOUT_MS));
                            try {
                                sourceConnector.start(originalsStrings);
                                ofNullable.ifPresent((v0) -> {
                                    v0.connectorStarted();
                                });
                                taskConfigs = sourceConnector.taskConfigs(1);
                                taskClass = sourceConnector.taskClass();
                            } finally {
                                try {
                                    try {
                                    } catch (Throwable th3) {
                                        this.latch.countDown();
                                        this.runningThread.set(null);
                                        this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                        return;
                                    }
                                    this.latch.countDown();
                                    this.runningThread.set(null);
                                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                    return;
                                } catch (Throwable th4) {
                                }
                            }
                            if (taskConfigs.isEmpty()) {
                                fail("Unable to start connector's task class '" + taskClass.getName() + "' with no task configuration");
                                try {
                                    try {
                                        offsetBackingStore.stop();
                                        try {
                                            sourceConnector.stop();
                                            ofNullable.ifPresent((v0) -> {
                                                v0.connectorStopped();
                                            });
                                        } catch (Throwable th5) {
                                            fail("Error while trying to stop connector class '" + string2 + "'", th5);
                                        }
                                    } catch (Throwable th6) {
                                        try {
                                            sourceConnector.stop();
                                            ofNullable.ifPresent((v0) -> {
                                                v0.connectorStopped();
                                            });
                                        } catch (Throwable th7) {
                                            fail("Error while trying to stop connector class '" + string2 + "'", th7);
                                        }
                                        throw th6;
                                    }
                                } catch (Throwable th8) {
                                    fail("Error while trying to stop the offset store", th8);
                                    try {
                                        sourceConnector.stop();
                                        ofNullable.ifPresent((v0) -> {
                                            v0.connectorStopped();
                                        });
                                    } catch (Throwable th9) {
                                        fail("Error while trying to stop connector class '" + string2 + "'", th9);
                                    }
                                }
                                this.latch.countDown();
                                this.runningThread.set(null);
                                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                return;
                            }
                            this.task = null;
                            try {
                                this.task = (SourceTask) taskClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                                try {
                                    this.task.initialize(new SourceTaskContext() { // from class: io.debezium.embedded.EmbeddedEngine.3
                                        public OffsetStorageReader offsetStorageReader() {
                                            return offsetStorageReaderImpl;
                                        }

                                        public Map<String, String> configs() {
                                            return null;
                                        }
                                    });
                                    this.task.start((Map) taskConfigs.get(0));
                                    ofNullable.ifPresent((v0) -> {
                                        v0.taskStarted();
                                    });
                                    this.recordsSinceLastCommit = 0L;
                                    Throwable th10 = null;
                                    Throwable th11 = null;
                                    try {
                                        this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
                                        RecordCommitter buildRecordCommitter = buildRecordCommitter(offsetStorageWriter, this.task, ofMillis);
                                        while (this.runningThread.get() != null) {
                                            List list = null;
                                            try {
                                                LOGGER.debug("Embedded engine is polling task for records on thread {}", this.runningThread.get());
                                                list = this.task.poll();
                                                LOGGER.debug("Embedded engine returned from polling task for records");
                                            } catch (RetriableException e) {
                                                int errorsMaxRetries = getErrorsMaxRetries();
                                                LOGGER.info("Retriable exception thrown, connector will be restarted; errors.max.retries={}", Integer.valueOf(errorsMaxRetries), e);
                                                if (errorsMaxRetries < DEFAULT_ERROR_MAX_RETRIES) {
                                                    throw e;
                                                }
                                                if (errorsMaxRetries != 0) {
                                                    DelayStrategy delayStrategy = delayStrategy(this.config);
                                                    int i = 0;
                                                    boolean z = false;
                                                    while (!z) {
                                                        try {
                                                            i++;
                                                            LOGGER.info("Starting connector, attempt {}", Integer.valueOf(i));
                                                            this.task.stop();
                                                            this.task.start((Map) taskConfigs.get(0));
                                                            z = true;
                                                        } catch (Exception e2) {
                                                            if (i == errorsMaxRetries) {
                                                                LOGGER.error("Can't start the connector, max retries to connect exceeded; stopping connector...", e2);
                                                                throw e2;
                                                            }
                                                            LOGGER.error("Can't start the connector, will retry later...", e2);
                                                        }
                                                        delayStrategy.sleepWhen(!z);
                                                    }
                                                }
                                            } catch (InterruptedException e3) {
                                                LOGGER.debug("Embedded engine interrupted on thread {} while polling the task for records", this.runningThread.get());
                                                if (this.runningThread.get() == Thread.currentThread()) {
                                                    Thread.currentThread().interrupt();
                                                }
                                            }
                                            if (list != null) {
                                                try {
                                                    if (!list.isEmpty()) {
                                                        LOGGER.debug("Received {} records from the task", Integer.valueOf(list.size()));
                                                        Stream stream = list.stream();
                                                        Transformations transformations = this.transformations;
                                                        Objects.requireNonNull(transformations);
                                                        list = (List) stream.map(transformations::transform).filter(sourceRecord -> {
                                                            return sourceRecord != null;
                                                        }).collect(Collectors.toList());
                                                    }
                                                } catch (Throwable th12) {
                                                    th10 = th12;
                                                }
                                            }
                                            if (list == null || list.isEmpty()) {
                                                LOGGER.debug("Received no records from the task");
                                            } else {
                                                LOGGER.debug("Received {} transformed records from the task", Integer.valueOf(list.size()));
                                                try {
                                                    this.handler.handleBatch(list, buildRecordCommitter);
                                                } catch (StopConnectorException e4) {
                                                }
                                            }
                                        }
                                        if (th10 != null) {
                                            fail("Stopping connector after error in the application's handler method: " + th10.getMessage(), th10);
                                        } else if (0 != 0) {
                                            fail("Stopping connector after retry error: " + th11.getMessage(), null);
                                        } else {
                                            succeed("Connector '" + string2 + "' completed normally.");
                                        }
                                        try {
                                            LOGGER.info("Stopping the task and engine");
                                            this.task.stop();
                                            ofNullable.ifPresent((v0) -> {
                                                v0.taskStopped();
                                            });
                                            commitOffsets(offsetStorageWriter, ofMillis, this.task);
                                        } catch (InterruptedException e5) {
                                            LOGGER.debug("Interrupted while committing offsets");
                                            Thread.currentThread().interrupt();
                                        } catch (Throwable th13) {
                                            fail("Error while trying to stop the task and commit the offsets", th13);
                                        }
                                        try {
                                            try {
                                                offsetBackingStore.stop();
                                                try {
                                                    sourceConnector.stop();
                                                    ofNullable.ifPresent((v0) -> {
                                                        v0.connectorStopped();
                                                    });
                                                } catch (Throwable th14) {
                                                    fail("Error while trying to stop connector class '" + string2 + "'", th14);
                                                }
                                            } catch (Throwable th15) {
                                                fail("Error while trying to stop the offset store", th15);
                                                try {
                                                    sourceConnector.stop();
                                                    ofNullable.ifPresent((v0) -> {
                                                        v0.connectorStopped();
                                                    });
                                                } catch (Throwable th16) {
                                                    fail("Error while trying to stop connector class '" + string2 + "'", th16);
                                                }
                                            }
                                            this.latch.countDown();
                                            this.runningThread.set(null);
                                            this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                            return;
                                        } catch (Throwable th17) {
                                            try {
                                                sourceConnector.stop();
                                                ofNullable.ifPresent((v0) -> {
                                                    v0.connectorStopped();
                                                });
                                            } catch (Throwable th18) {
                                                fail("Error while trying to stop connector class '" + string2 + "'", th18);
                                            }
                                            throw th17;
                                        }
                                    } catch (Throwable th19) {
                                        if (0 != 0) {
                                            fail("Stopping connector after error in the application's handler method: " + th10.getMessage(), null);
                                        } else if (0 != 0) {
                                            fail("Stopping connector after retry error: " + th11.getMessage(), null);
                                        } else {
                                            succeed("Connector '" + string2 + "' completed normally.");
                                        }
                                        try {
                                            LOGGER.info("Stopping the task and engine");
                                            this.task.stop();
                                            ofNullable.ifPresent((v0) -> {
                                                v0.taskStopped();
                                            });
                                            commitOffsets(offsetStorageWriter, ofMillis, this.task);
                                        } catch (InterruptedException e6) {
                                            LOGGER.debug("Interrupted while committing offsets");
                                            Thread.currentThread().interrupt();
                                        } catch (Throwable th20) {
                                            fail("Error while trying to stop the task and commit the offsets", th20);
                                        }
                                        throw th19;
                                    }
                                } catch (Throwable th21) {
                                    try {
                                        LOGGER.debug("Stopping the task");
                                        this.task.stop();
                                    } catch (Throwable th22) {
                                        LOGGER.info("Error while trying to stop the task");
                                    }
                                    fail("Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: " + Configuration.from((Map) taskConfigs.get(0)).withMaskedPasswords(), th21);
                                    try {
                                        try {
                                            offsetBackingStore.stop();
                                            try {
                                                sourceConnector.stop();
                                                ofNullable.ifPresent((v0) -> {
                                                    v0.connectorStopped();
                                                });
                                            } catch (Throwable th23) {
                                                fail("Error while trying to stop connector class '" + string2 + "'", th23);
                                            }
                                        } catch (Throwable th24) {
                                            try {
                                                sourceConnector.stop();
                                                ofNullable.ifPresent((v0) -> {
                                                    v0.connectorStopped();
                                                });
                                            } catch (Throwable th25) {
                                                fail("Error while trying to stop connector class '" + string2 + "'", th25);
                                            }
                                            throw th24;
                                        }
                                    } catch (Throwable th26) {
                                        fail("Error while trying to stop the offset store", th26);
                                        try {
                                            sourceConnector.stop();
                                            ofNullable.ifPresent((v0) -> {
                                                v0.connectorStopped();
                                            });
                                        } catch (Throwable th27) {
                                            fail("Error while trying to stop connector class '" + string2 + "'", th27);
                                        }
                                        this.latch.countDown();
                                        this.runningThread.set(null);
                                        this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                        return;
                                    }
                                    this.latch.countDown();
                                    this.runningThread.set(null);
                                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                    return;
                                }
                            } catch (IllegalAccessException | InstantiationException e7) {
                                fail("Unable to instantiate connector's task class '" + taskClass.getName() + "'", e7);
                                try {
                                    try {
                                        offsetBackingStore.stop();
                                        try {
                                            sourceConnector.stop();
                                            ofNullable.ifPresent((v0) -> {
                                                v0.connectorStopped();
                                            });
                                        } catch (Throwable th28) {
                                            fail("Error while trying to stop connector class '" + string2 + "'", th28);
                                        }
                                    } catch (Throwable th29) {
                                        fail("Error while trying to stop the offset store", th29);
                                        try {
                                            sourceConnector.stop();
                                            ofNullable.ifPresent((v0) -> {
                                                v0.connectorStopped();
                                            });
                                        } catch (Throwable th30) {
                                            fail("Error while trying to stop connector class '" + string2 + "'", th30);
                                        }
                                        this.latch.countDown();
                                        this.runningThread.set(null);
                                        this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                        return;
                                    }
                                    this.latch.countDown();
                                    this.runningThread.set(null);
                                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                    return;
                                } catch (Throwable th31) {
                                    try {
                                        sourceConnector.stop();
                                        ofNullable.ifPresent((v0) -> {
                                            v0.connectorStopped();
                                        });
                                    } catch (Throwable th32) {
                                        fail("Error while trying to stop connector class '" + string2 + "'", th32);
                                    }
                                    throw th31;
                                }
                            }
                        } catch (Throwable th33) {
                            try {
                                try {
                                    offsetBackingStore.stop();
                                    try {
                                        sourceConnector.stop();
                                        ofNullable.ifPresent((v0) -> {
                                            v0.connectorStopped();
                                        });
                                    } catch (Throwable th34) {
                                        fail("Error while trying to stop connector class '" + string2 + "'", th34);
                                    }
                                } catch (Throwable th35) {
                                    fail("Error while trying to stop the offset store", th35);
                                    try {
                                        sourceConnector.stop();
                                        ofNullable.ifPresent((v0) -> {
                                            v0.connectorStopped();
                                        });
                                    } catch (Throwable th36) {
                                        fail("Error while trying to stop connector class '" + string2 + "'", th36);
                                    }
                                    throw th33;
                                }
                                throw th33;
                            } catch (Throwable th37) {
                                try {
                                    sourceConnector.stop();
                                    ofNullable.ifPresent((v0) -> {
                                        v0.connectorStopped();
                                    });
                                } catch (Throwable th38) {
                                    fail("Error while trying to stop connector class '" + string2 + "'", th38);
                                }
                                throw th37;
                            }
                        }
                    } catch (Throwable th39) {
                        fail("Unable to configure and start the '" + string3 + "' offset backing store", th39);
                        offsetBackingStore.stop();
                        this.latch.countDown();
                        this.runningThread.set(null);
                        this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                        return;
                    }
                } catch (Throwable th40) {
                    fail("Unable to instantiate OffsetBackingStore class '" + string3 + "'", th40);
                    this.latch.countDown();
                    this.runningThread.set(null);
                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                    return;
                }
                this.latch.countDown();
                this.runningThread.set(null);
                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                throw th;
            } catch (Throwable th41) {
                fail("Unable to instantiate connector class '" + string2 + "'", th41);
                this.latch.countDown();
                this.runningThread.set(null);
                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
            }
        }
    }

    private int getErrorsMaxRetries() {
        return this.config.getInteger(ERRORS_MAX_RETRIES);
    }

    protected RecordCommitter buildRecordCommitter(final OffsetStorageWriter offsetStorageWriter, final SourceTask sourceTask, final Duration duration) {
        return new RecordCommitter() { // from class: io.debezium.embedded.EmbeddedEngine.4
            public synchronized void markProcessed(SourceRecord sourceRecord) throws InterruptedException {
                sourceTask.commitRecord(sourceRecord);
                EmbeddedEngine.this.recordsSinceLastCommit++;
                offsetStorageWriter.offset(sourceRecord.sourcePartition(), sourceRecord.sourceOffset());
            }

            public synchronized void markBatchFinished() throws InterruptedException {
                EmbeddedEngine.this.maybeFlush(offsetStorageWriter, EmbeddedEngine.this.offsetCommitPolicy, duration, sourceTask);
            }

            public synchronized void markProcessed(SourceRecord sourceRecord, DebeziumEngine.Offsets offsets) throws InterruptedException {
                markProcessed(new SourceRecord(sourceRecord.sourcePartition(), ((SourceRecordOffsets) offsets).getOffsets(), sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.keySchema(), sourceRecord.key(), sourceRecord.valueSchema(), sourceRecord.value(), sourceRecord.timestamp(), sourceRecord.headers()));
            }

            public DebeziumEngine.Offsets buildOffsets() {
                return new SourceRecordOffsets();
            }
        };
    }

    protected void maybeFlush(OffsetStorageWriter offsetStorageWriter, OffsetCommitPolicy offsetCommitPolicy, Duration duration, SourceTask sourceTask) throws InterruptedException {
        if (offsetCommitPolicy.performCommit(this.recordsSinceLastCommit, Duration.ofMillis(this.clock.currentTimeInMillis() - this.timeOfLastCommitMillis))) {
            commitOffsets(offsetStorageWriter, duration, sourceTask);
        }
    }

    protected void commitOffsets(OffsetStorageWriter offsetStorageWriter, Duration duration, SourceTask sourceTask) throws InterruptedException {
        Future doFlush;
        long currentTimeInMillis = this.clock.currentTimeInMillis() + duration.toMillis();
        if (offsetStorageWriter.beginFlush() && (doFlush = offsetStorageWriter.doFlush(this::completedFlush)) != null) {
            try {
                doFlush.get(Math.max(currentTimeInMillis - this.clock.currentTimeInMillis(), 0L), TimeUnit.MILLISECONDS);
                sourceTask.commit();
                this.recordsSinceLastCommit = 0L;
                this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
            } catch (InterruptedException e) {
                LOGGER.warn("Flush of {} offsets interrupted, cancelling", this);
                offsetStorageWriter.cancelFlush();
                if (this.runningThread.get() == Thread.currentThread()) {
                    Thread.currentThread().interrupt();
                    throw e;
                }
            } catch (ExecutionException e2) {
                LOGGER.error("Flush of {} offsets threw an unexpected exception: ", this, e2);
                offsetStorageWriter.cancelFlush();
            } catch (TimeoutException e3) {
                LOGGER.error("Timed out waiting to flush {} offsets to storage", this);
                offsetStorageWriter.cancelFlush();
            }
        }
    }

    protected void completedFlush(Throwable th, Void r7) {
        if (th != null) {
            LOGGER.error("Failed to flush {} offsets to storage: ", this, th);
        } else {
            LOGGER.trace("Finished flushing {} offsets to storage", this);
        }
    }

    public boolean stop() {
        LOGGER.info("Stopping the embedded engine");
        Thread andSet = this.runningThread.getAndSet(null);
        if (andSet == null) {
            return false;
        }
        try {
            Duration ofMillis = Duration.ofMillis(Long.valueOf(System.getProperty(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP, Long.toString(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT.toMillis()))).longValue());
            LOGGER.info("Waiting for {} for connector to stop", ofMillis);
            this.latch.await(ofMillis.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
        LOGGER.debug("Interrupting the embedded engine's thread {} (already interrupted: {})", andSet, Boolean.valueOf(andSet.isInterrupted()));
        andSet.interrupt();
        return true;
    }

    public void close() throws IOException {
        stop();
    }

    public boolean await(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.latch.await(j, timeUnit);
    }

    public String toString() {
        return "EmbeddedEngine{id=" + this.config.getString(ENGINE_NAME) + "}";
    }

    public void runWithTask(Consumer<SourceTask> consumer) {
        consumer.accept(this.task);
    }

    private DelayStrategy delayStrategy(Configuration configuration) {
        return DelayStrategy.exponential(Duration.ofMillis(configuration.getInteger(ERRORS_RETRY_DELAY_INITIAL_MS)), Duration.ofMillis(configuration.getInteger(ERRORS_RETRY_DELAY_MAX_MS)));
    }

    static {
        $assertionsDisabled = !EmbeddedEngine.class.desiredAssertionStatus();
        ENGINE_NAME = Field.create("name").withDescription("Unique name for this connector instance.").required();
        CONNECTOR_CLASS = Field.create("connector.class").withDescription("The Java class for the connector").required();
        OFFSET_STORAGE = Field.create("offset.storage").withDescription("The Java class that implements the `OffsetBackingStore` interface, used to periodically store offsets so that, upon restart, the connector can resume where it last left off.").withDefault(FileOffsetBackingStore.class.getName());
        OFFSET_STORAGE_FILE_FILENAME = Field.create("offset.storage.file.filename").withDescription("The file where offsets are to be stored. Required when 'offset.storage' is set to the " + FileOffsetBackingStore.class.getName() + " class.").withDefault("");
        OFFSET_STORAGE_KAFKA_TOPIC = Field.create("offset.storage.topic").withDescription("The name of the Kafka topic where offsets are to be stored. Required with other properties when 'offset.storage' is set to the " + KafkaOffsetBackingStore.class.getName() + " class.").withDefault("");
        OFFSET_STORAGE_KAFKA_PARTITIONS = Field.create("offset.storage.partitions").withType(ConfigDef.Type.INT).withDescription("The number of partitions used when creating the offset storage topic. Required with other properties when 'offset.storage' is set to the " + KafkaOffsetBackingStore.class.getName() + " class.");
        OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR = Field.create("offset.storage.replication.factor").withType(ConfigDef.Type.SHORT).withDescription("Replication factor used when creating the offset storage topic. Required with other properties when 'offset.storage' is set to the " + KafkaOffsetBackingStore.class.getName() + " class.");
        OFFSET_FLUSH_INTERVAL_MS = Field.create("offset.flush.interval.ms").withDescription("Interval at which to try committing offsets, given in milliseconds. Defaults to 1 minute (60,000 ms).").withDefault(60000L).withValidation(new Field.Validator[]{Field::isNonNegativeInteger});
        OFFSET_COMMIT_TIMEOUT_MS = Field.create("offset.flush.timeout.ms").withDescription("Time to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt, given in milliseconds. Defaults to 5 seconds (5000 ms).").withDefault(5000L).withValidation(new Field.Validator[]{Field::isPositiveInteger});
        OFFSET_COMMIT_POLICY = Field.create("offset.commit.policy").withDescription("The fully-qualified class name of the commit policy type. This class must implement the interface " + OffsetCommitPolicy.class.getName() + ". The default is a periodic commit policy based upon time intervals.").withDefault(OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName()).withValidation(new Field.Validator[]{Field::isClassName});
        PREDICATES = Field.create("predicates").withDisplayName("List of prefixes defining predicates.").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Optional list of predicates that can be assigned to transformations. The predicates are defined using '<predicate.prefix>.type' config option and configured using options '<predicate.prefix>.<option>'");
        TRANSFORMS = Field.create("transforms").withDisplayName("List of prefixes defining transformations.").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Optional list of single message transformations applied on the messages. The transforms are defined using '<transform.prefix>.type' config option and configured using options '<transform.prefix>.<option>'");
        ERRORS_MAX_RETRIES = Field.create("errors.max.retries").withDisplayName("The maximum number of retries").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(DEFAULT_ERROR_MAX_RETRIES).withValidation(new Field.Validator[]{Field::isInteger}).withDescription("The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries).");
        ERRORS_RETRY_DELAY_INITIAL_MS = Field.create("errors.retry.delay.initial.ms").withDisplayName("Initial delay for retries").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(300).withValidation(new Field.Validator[]{Field::isPositiveInteger}).withDescription("Initial delay (in ms) for retries when encountering connection errors. This value will be doubled upon every retry but won't exceed 'errors.retry.delay.max.ms'.");
        ERRORS_RETRY_DELAY_MAX_MS = Field.create("errors.retry.delay.max.ms").withDisplayName("Max delay between retries").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(10000).withValidation(new Field.Validator[]{Field::isPositiveInteger}).withDescription("Max delay (in ms) between retries when encountering connection errors.");
        CONNECTOR_FIELDS = Field.setOf(new Field[]{ENGINE_NAME, CONNECTOR_CLASS});
        ALL_FIELDS = CONNECTOR_FIELDS.with(new Field[]{OFFSET_STORAGE, OFFSET_STORAGE_FILE_FILENAME, OFFSET_FLUSH_INTERVAL_MS, OFFSET_COMMIT_TIMEOUT_MS, ERRORS_MAX_RETRIES, ERRORS_RETRY_DELAY_INITIAL_MS, ERRORS_RETRY_DELAY_MAX_MS});
        WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT = Duration.ofMinutes(5L);
        LOGGER = LoggerFactory.getLogger(EmbeddedEngine.class);
    }
}
