/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded.async;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Instantiator;
import io.debezium.embedded.ConverterBuilder;
import io.debezium.embedded.DebeziumEngineCommon;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.EmbeddedEngineSignaler;
import io.debezium.embedded.EmbeddedWorkerConfig;
import io.debezium.embedded.KafkaConnectUtil;
import io.debezium.embedded.Transformations;
import io.debezium.embedded.async.AsyncEngineConfig;
import io.debezium.embedded.async.ParallelSmtAndConvertAsyncConsumerProcessor;
import io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor;
import io.debezium.embedded.async.ParallelSmtAndConvertConsumerProcessor;
import io.debezium.embedded.async.ParallelSmtAsyncConsumerProcessor;
import io.debezium.embedded.async.ParallelSmtBatchProcessor;
import io.debezium.embedded.async.ParallelSmtConsumerProcessor;
import io.debezium.embedded.async.RecordProcessor;
import io.debezium.embedded.async.RetryingCallable;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.StopEngineException;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.source.DebeziumSourceConnectorContext;
import io.debezium.engine.source.EngineSourceConnector;
import io.debezium.engine.source.EngineSourceConnectorContext;
import io.debezium.engine.source.EngineSourceTask;
import io.debezium.engine.source.EngineSourceTaskContext;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AsyncEmbeddedEngine<R>
implements DebeziumEngine<R>,
AsyncEngineConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncEmbeddedEngine.class);
    private final Configuration config;
    private final Clock clock;
    private final ClassLoader classLoader;
    private final Consumer<R> consumer;
    private final DebeziumEngine.ChangeConsumer<R> handler;
    private final DebeziumEngine.CompletionCallback completionCallback;
    private final Optional<DebeziumEngine.ConnectorCallback> connectorCallback;
    private final Converter offsetKeyConverter;
    private final Converter offsetValueConverter;
    private final WorkerConfig workerConfig;
    private final OffsetCommitPolicy offsetCommitPolicy;
    private final EngineSourceConnector connector;
    private final Transformations transformations;
    private final HeaderConverter headerConverter;
    private final Function<SourceRecord, R> recordConverter;
    private final Function<R, SourceRecord> sourceConverter;
    private final AtomicReference<State> state = new AtomicReference<State>(State.CREATING);
    private final List<EngineSourceTask> tasks = new ArrayList<EngineSourceTask>();
    private final List<Future<Void>> pollingFutures = new ArrayList<Future<Void>>();
    private final ExecutorService taskService;
    private final ExecutorService recordService;
    private final CountDownLatch shutDownLatch = new CountDownLatch(1);
    private DebeziumEngine.Signaler signaler;

    private AsyncEmbeddedEngine(Properties config, Consumer<R> consumer, DebeziumEngine.ChangeConsumer<R> handler, ClassLoader classLoader, Clock clock, DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback, OffsetCommitPolicy offsetCommitPolicy, HeaderConverter headerConverter, Function<SourceRecord, R> recordConverter) {
        this.config = Configuration.from((Properties)Objects.requireNonNull(config, "A connector configuration must be specified."));
        this.consumer = consumer;
        this.handler = handler;
        this.classLoader = classLoader == null ? Instantiator.getClassLoader() : classLoader;
        this.clock = clock == null ? Clock.system() : clock;
        this.completionCallback = completionCallback != null ? completionCallback : new DefaultCompletionCallback();
        this.connectorCallback = Optional.ofNullable(connectorCallback);
        this.headerConverter = headerConverter;
        this.recordConverter = recordConverter;
        this.sourceConverter = record -> ((EmbeddedEngineChangeEvent)record).sourceRecord();
        if (this.handler == null & this.consumer == null) {
            throw new DebeziumException("Either java.util.function.Consumer or DebeziumEngine.ChangeConsumer must be specified.");
        }
        if (this.handler == null && RecordProcessingOrder.parse(this.config.getString(AsyncEngineConfig.RECORD_PROCESSING_ORDER)) == null) {
            throw new DebeziumException(String.format("'%s' is not a valid 'record.processing.order' options", this.config.getString(AsyncEngineConfig.RECORD_PROCESSING_ORDER)));
        }
        this.taskService = Executors.newFixedThreadPool(this.config.getInteger("tasks.max", () -> 1));
        String processingThreads = this.config.getString(RECORD_PROCESSING_THREADS);
        this.recordService = processingThreads == null || processingThreads.isBlank() ? new ThreadPoolExecutor(0, AsyncEngineConfig.AVAILABLE_CORES, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) : Executors.newFixedThreadPool(this.computeRecordThreads(processingThreads));
        if (!this.config.validateAndRecord((Iterable)AsyncEngineConfig.CONNECTOR_FIELDS, arg_0 -> ((Logger)LOGGER).error(arg_0))) {
            DebeziumException e = new DebeziumException("Failed to start connector with invalid configuration (see logs for actual errors)", null);
            this.completionCallback.handle(false, "Failed to start connector with invalid configuration (see logs for actual errors)", (Throwable)e);
            throw e;
        }
        this.workerConfig = new EmbeddedWorkerConfig(this.config.asMap(AsyncEngineConfig.ALL_FIELDS));
        try {
            this.offsetCommitPolicy = offsetCommitPolicy == null ? (OffsetCommitPolicy)Instantiator.getInstanceWithProperties((String)this.config.getString(AsyncEngineConfig.OFFSET_COMMIT_POLICY), (Properties)config) : offsetCommitPolicy;
            this.offsetKeyConverter = (Converter)Instantiator.getInstance((String)JsonConverter.class.getName());
            this.offsetValueConverter = (Converter)Instantiator.getInstance((String)JsonConverter.class.getName());
            this.transformations = new Transformations(Configuration.from((Properties)config));
            Class<?> connectorClass = this.classLoader.loadClass(this.config.getString(AsyncEngineConfig.CONNECTOR_CLASS));
            SourceConnector connectConnector = (SourceConnector)connectorClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            this.connector = new EngineSourceConnector(connectConnector);
        }
        catch (Throwable t) {
            this.completionCallback.handle(false, "Failed to instantiate required class", t);
            throw new DebeziumException(t);
        }
        Map<String, String> internalConverterConfig = Collections.singletonMap("schemas.enable", "false");
        this.offsetKeyConverter.configure(internalConverterConfig, true);
        this.offsetValueConverter.configure(internalConverterConfig, false);
    }

    List<EngineSourceTask> tasks() {
        return this.tasks;
    }

    public void run() {
        Throwable exitError = null;
        try {
            LOGGER.debug("Initializing connector and starting it.");
            this.setEngineState(State.CREATING, State.INITIALIZING);
            this.connector.connectConnector().start(this.initializeConnector());
            LOGGER.debug("Calling connector callback after connector has started.");
            this.connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted);
            LOGGER.debug("Creating source tasks.");
            this.setEngineState(State.INITIALIZING, State.CREATING_TASKS);
            this.createSourceTasks(this.connector, this.tasks);
            LOGGER.debug("Starting source tasks.");
            this.setEngineState(State.CREATING_TASKS, State.STARTING_TASKS);
            this.startSourceTasks(this.tasks);
            LOGGER.debug("Starting tasks polling.");
            this.setEngineState(State.STARTING_TASKS, State.POLLING_TASKS);
            this.runTasksPolling(this.tasks);
        }
        catch (Throwable t) {
            exitError = t;
            this.closeEngineWithException(exitError);
        }
        finally {
            this.finishShutDown(exitError);
        }
    }

    public void close() throws IOException {
        LOGGER.debug("Engine shutdown called.");
        State engineState = this.getEngineState();
        if (engineState == State.STARTING_TASKS) {
            throw new IllegalStateException("Cannot stop engine while tasks are starting, this may lead to leaked resource. Wait for the tasks to be fully started.");
        }
        if (engineState == State.STOPPING) {
            throw new IllegalStateException("Engine is already being shutting down.");
        }
        if (engineState == State.STOPPED) {
            throw new IllegalStateException("Engine has been already shut down.");
        }
        LOGGER.debug("Stopping " + AsyncEmbeddedEngine.class.getName());
        this.setEngineState(engineState, State.STOPPING);
        this.close(engineState);
    }

    public void runWithTask(Consumer<SourceTask> consumer) {
        for (EngineSourceTask task : this.tasks) {
            consumer.accept(task.connectTask());
        }
    }

    private void close(State stateBeforeStop) {
        this.stopConnector(this.tasks, stateBeforeStop);
        if (this.headerConverter != null) {
            try {
                this.headerConverter.close();
            }
            catch (IOException e) {
                LOGGER.warn("Failed to close header converter: ", (Throwable)e);
            }
        }
        if (this.transformations != null) {
            try {
                this.transformations.close();
            }
            catch (IOException e) {
                LOGGER.warn("Failed to close transformations: ", (Throwable)e);
            }
        }
        this.shutDownLatch.countDown();
    }

    private void closeEngineWithException(Throwable exitError) {
        LOGGER.error("Engine has failed with ", exitError);
        State stateBeforeStop = this.getEngineState();
        if (State.canBeStopped(stateBeforeStop)) {
            LOGGER.debug("Stopping " + AsyncEmbeddedEngine.class.getName());
            this.setEngineState(stateBeforeStop, State.STOPPING);
            try {
                this.close(stateBeforeStop);
            }
            catch (Throwable ct) {
                LOGGER.error("Failed to close the engine: ", ct);
            }
        }
    }

    private void finishShutDown(Throwable exitError) {
        try {
            this.shutDownLatch.await();
        }
        catch (InterruptedException e) {
            LOGGER.warn("Interrupted while waiting for shutdown to finish.");
        }
        LOGGER.info("Engine is stopped.");
        this.setEngineState(State.STOPPING, State.STOPPED);
        LOGGER.debug("Calling completion handler.");
        this.callCompletionHandler(exitError);
    }

    private Map<String, String> initializeConnector() throws Exception {
        LOGGER.debug("Preparing connector initialization");
        String engineName = this.config.getString(AsyncEngineConfig.ENGINE_NAME);
        String connectorClassName = this.config.getString(AsyncEngineConfig.CONNECTOR_CLASS);
        Map<String, String> connectorConfig = this.validateAndGetConnectorConfig(this.connector.connectConnector(), connectorClassName);
        LOGGER.debug("Initializing offset store, offset reader and writer");
        OffsetBackingStore offsetStore = this.createAndStartOffsetStore(connectorConfig);
        OffsetStorageReaderImpl offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName, this.offsetKeyConverter, this.offsetValueConverter);
        OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName, this.offsetKeyConverter, this.offsetValueConverter);
        LOGGER.debug("Initializing Connect connector itself");
        this.connector.initialize(new EngineSourceConnectorContext(this, offsetStore, (OffsetStorageReader)offsetReader, offsetWriter));
        return connectorConfig;
    }

    private void createSourceTasks(EngineSourceConnector connector, List<EngineSourceTask> tasks) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
        Class taskClass = connector.connectConnector().taskClass();
        List taskConfigs = connector.connectConnector().taskConfigs(this.config.getInteger("tasks.max", 1));
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Following task configurations will be used for creating tasks:");
            for (int i = 0; i < taskConfigs.size(); ++i) {
                LOGGER.debug("Config #{}: {}", (Object)i, taskConfigs.get(i));
            }
        }
        if (taskConfigs.size() < 1) {
            LOGGER.warn("No task configuration provided.");
        } else {
            LOGGER.debug("Creating {} instance(s) of source task(s)", (Object)taskConfigs.size());
        }
        for (Map taskConfig : taskConfigs) {
            SourceTask task = (SourceTask)taskClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            EngineSourceTaskContext taskContext = new EngineSourceTaskContext(taskConfig, connector.context().offsetStorageReader(), connector.context().offsetStorageWriter(), this.offsetCommitPolicy, this.clock, this.transformations);
            task.initialize((SourceTaskContext)taskContext);
            tasks.add(new EngineSourceTask(task, taskContext));
        }
    }

    private void startSourceTasks(List<EngineSourceTask> tasks) throws Exception {
        LOGGER.debug("Starting source connector tasks.");
        ExecutorCompletionService<Void> taskCompletionService = new ExecutorCompletionService<Void>(this.taskService);
        for (EngineSourceTask task : tasks) {
            taskCompletionService.submit(() -> {
                task.connectTask().start(task.context().config());
                return null;
            });
        }
        long taskStartupTimeout = this.config.getLong(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS);
        LOGGER.info("Waiting max. for {} ms for individual source tasks to start.", (Object)taskStartupTimeout);
        int nTasks = tasks.size();
        Exception error = null;
        int failedTasks = 0;
        for (int i = 0; i < nTasks; ++i) {
            try {
                Future taskFuture = taskCompletionService.poll(taskStartupTimeout, TimeUnit.MILLISECONDS);
                if (taskFuture == null) {
                    throw new InterruptedException("Time out while waiting for source task to start.");
                }
                taskFuture.get();
                LOGGER.debug("Started task #{} out of {} tasks.", (Object)(i + 1), (Object)nTasks);
            }
            catch (Exception e) {
                LOGGER.debug("Task #{} (out of {} tasks) failed to start. Failed with", new Object[]{i + 1, nTasks, e});
                ++failedTasks;
                if (error != null) continue;
                error = e;
                continue;
            }
            LOGGER.debug("Calling connector callback after task is started.");
            this.connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStarted);
        }
        if (error != null) {
            LOGGER.error("{} task(s) out of {} failed to start.", (Object)failedTasks, (Object)nTasks);
            throw error;
        }
        LOGGER.info("All tasks have started successfully.");
    }

    private void runTasksPolling(List<EngineSourceTask> tasks) throws ExecutionException {
        LOGGER.debug("Starting tasks polling.");
        ExecutorCompletionService<Void> taskCompletionService = new ExecutorCompletionService<Void>(this.taskService);
        String processorClassName = this.selectRecordProcessor();
        for (EngineSourceTask task : tasks) {
            RecordProcessor processor = this.createRecordProcessor(processorClassName, task);
            processor.initialize(this.recordService, this.transformations);
            this.pollingFutures.add(taskCompletionService.submit(new PollRecords(task, processor, this.state)));
        }
        for (int i = 0; i < tasks.size(); ++i) {
            try {
                taskCompletionService.take().get();
            }
            catch (InterruptedException | CancellationException e) {
                LOGGER.info("Task interrupted while polling.");
            }
            LOGGER.debug("Task #{} out of {} tasks has stopped polling.", (Object)i, (Object)tasks.size());
        }
    }

    private String selectRecordProcessor() {
        if (this.handler != null && this.recordConverter == null) {
            LOGGER.info("Using {} processor", (Object)ParallelSmtBatchProcessor.class.getName());
            return ParallelSmtBatchProcessor.class.getName();
        }
        if (this.handler != null && this.recordConverter != null) {
            LOGGER.info("Using {} processor", (Object)ParallelSmtAndConvertBatchProcessor.class.getName());
            return ParallelSmtAndConvertBatchProcessor.class.getName();
        }
        RecordProcessingOrder processingOrder = RecordProcessingOrder.parse(this.config.getString(AsyncEngineConfig.RECORD_PROCESSING_ORDER));
        if (processingOrder == RecordProcessingOrder.ORDERED && this.recordConverter == null) {
            LOGGER.info("Using {} processor", (Object)ParallelSmtConsumerProcessor.class.getName());
            return ParallelSmtConsumerProcessor.class.getName();
        }
        if (processingOrder == RecordProcessingOrder.ORDERED && this.recordConverter != null) {
            LOGGER.info("Using {} processor", (Object)ParallelSmtAndConvertConsumerProcessor.class.getName());
            return ParallelSmtAndConvertConsumerProcessor.class.getName();
        }
        if (processingOrder == RecordProcessingOrder.UNORDERED && this.recordConverter == null) {
            LOGGER.info("Using {} processor", (Object)ParallelSmtAsyncConsumerProcessor.class.getName());
            return ParallelSmtAsyncConsumerProcessor.class.getName();
        }
        if (processingOrder == RecordProcessingOrder.UNORDERED && this.recordConverter != null) {
            LOGGER.info("Using {} processor", (Object)ParallelSmtAndConvertAsyncConsumerProcessor.class.getName());
            return ParallelSmtAndConvertAsyncConsumerProcessor.class.getName();
        }
        throw new IllegalStateException("Unable to select RecordProcessor, this should never happen.");
    }

    private RecordProcessor createRecordProcessor(String processorClassName, EngineSourceTask task) {
        if (ParallelSmtBatchProcessor.class.getName().equals(processorClassName)) {
            return new ParallelSmtBatchProcessor(new SourceRecordCommitter(task), this.handler);
        }
        if (ParallelSmtAndConvertBatchProcessor.class.getName().equals(processorClassName)) {
            return new ParallelSmtAndConvertBatchProcessor<R>(new ConvertingRecordCommitter(task), this.handler, this.recordConverter);
        }
        if (ParallelSmtConsumerProcessor.class.getName().equals(processorClassName)) {
            return new ParallelSmtConsumerProcessor(new SourceRecordCommitter(task), this.consumer);
        }
        if (ParallelSmtAndConvertConsumerProcessor.class.getName().equals(processorClassName)) {
            return new ParallelSmtAndConvertConsumerProcessor<R>(new SourceRecordCommitter(task), this.consumer, this.recordConverter);
        }
        if (ParallelSmtAsyncConsumerProcessor.class.getName().equals(processorClassName)) {
            return new ParallelSmtAsyncConsumerProcessor(new SourceRecordCommitter(task), this.consumer);
        }
        if (ParallelSmtAndConvertAsyncConsumerProcessor.class.getName().equals(processorClassName)) {
            return new ParallelSmtAndConvertAsyncConsumerProcessor<R>(new SourceRecordCommitter(task), this.consumer, this.recordConverter);
        }
        throw new IllegalStateException("Unable to create RecordProcessor instance, this should never happen.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopRecordService() {
        LOGGER.debug("Stopping records service.");
        long shutdownTimeout = this.config.getLong(AsyncEngineConfig.RECORD_PROCESSING_SHUTDOWN_TIMEOUT_MS);
        try {
            this.recordService.shutdown();
            this.recordService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOGGER.info("Timed out while waiting for record service shutdown. Shutting it down immediately.");
        }
        finally {
            this.recordService.shutdownNow();
        }
    }

    private void stopPollingIfNeeded() {
        for (Future<Void> pollingFuture : this.pollingFutures) {
            if (pollingFuture.isDone()) continue;
            pollingFuture.cancel(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopSourceTasks(List<EngineSourceTask> tasks) {
        try {
            LOGGER.debug("Stopping source connector tasks.");
            ExecutorCompletionService<Void> taskCompletionService = new ExecutorCompletionService<Void>(this.taskService);
            for (EngineSourceTask task : tasks) {
                long commitTimeout = Configuration.from(task.context().config()).getLong(EmbeddedEngineConfig.OFFSET_COMMIT_TIMEOUT_MS);
                taskCompletionService.submit(() -> {
                    LOGGER.debug("Committing task's offset.");
                    AsyncEmbeddedEngine.commitOffsets(task.context().offsetStorageWriter(), task.context().clock(), commitTimeout, task.connectTask());
                    LOGGER.debug("Stopping Connect task.");
                    task.connectTask().stop();
                    return null;
                });
            }
            long taskStopTimeout = this.config.getLong(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS);
            LOGGER.debug("Waiting max. for {} ms for individual source tasks to stop.", (Object)taskStopTimeout);
            int nTasks = tasks.size();
            long startTime = System.nanoTime();
            for (int i = 0; i < nTasks; ++i) {
                Future taskFuture = taskCompletionService.poll(taskStopTimeout, TimeUnit.MILLISECONDS);
                if (taskFuture == null) {
                    throw new InterruptedException("Time out while waiting for source task to stop.");
                }
                taskFuture.get(0L, TimeUnit.MILLISECONDS);
                LOGGER.info("Stopped task #{} out of {} tasks (it took {} ms to stop the task).", new Object[]{i + 1, nTasks, (System.nanoTime() - startTime) / 1000000L});
                LOGGER.debug("Calling connector callback after task is stopped.");
                this.connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStopped);
            }
            LOGGER.debug("Stopping all remaining tasks if there are any.");
            this.taskService.shutdown();
        }
        catch (InterruptedException e) {
            LOGGER.warn("Stopping of the tasks was interrupted, shutting down immediately.");
        }
        catch (Exception e) {
            LOGGER.warn("Failure during stopping tasks, stopping them immediately. Failed with ", (Throwable)e);
        }
        finally {
            this.taskService.shutdownNow();
        }
    }

    private void stopOffsetStore(DebeziumSourceConnectorContext connectorContext) {
        if (connectorContext == null || connectorContext.offsetStore() == null) {
            LOGGER.debug("Offset store hasn't been initialized yet, closing of the offset store is skipped.");
            return;
        }
        LOGGER.debug("Stopping offset backing store.");
        try {
            connectorContext.offsetStore().stop();
        }
        catch (Exception e) {
            LOGGER.warn("Failed to stop offset backing store", (Throwable)e);
        }
    }

    private void stopConnector(List<EngineSourceTask> tasks, State engineState) {
        if (State.shouldStopTasks(engineState)) {
            LOGGER.debug("Tasks were already started, stopping record service and tasks.");
            this.stopRecordService();
            this.stopPollingIfNeeded();
            this.stopSourceTasks(tasks);
        }
        this.stopOffsetStore(this.connector.context());
        LOGGER.debug("Stopping the connector.");
        this.connector.connectConnector().stop();
        LOGGER.debug("Calling connector callback after connector stop");
        this.connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStopped);
    }

    private void callCompletionHandler(Throwable error) {
        if (error == null) {
            this.completionCallback.handle(true, String.format("Connector '%s' completed normally.", this.config.getString(AsyncEngineConfig.CONNECTOR_CLASS)), null);
        } else {
            Throwable realError = error instanceof ExecutionException ? error.getCause() : error;
            this.completionCallback.handle(false, error.getMessage(), realError);
        }
    }

    private State getEngineState() {
        return this.state.get();
    }

    private void setEngineState(State expectedState, State requestedState) {
        if (!this.state.compareAndSet(expectedState, requestedState)) {
            throw new IllegalStateException(String.format("Cannot change engine state to '%s' as the engine is not in expected state '%s', current engine state is '%s'", new Object[]{requestedState, expectedState, this.state.get()}));
        }
        LOGGER.info("Engine state has changed from '{}' to '{}'", (Object)expectedState, (Object)requestedState);
    }

    private Map<String, String> validateAndGetConnectorConfig(SourceConnector connector, String connectorClassName) {
        LOGGER.debug("Validating provided connector configuration.");
        Map connectorConfig = this.workerConfig.originalsStrings();
        Config validatedConnectorConfig = connector.validate(connectorConfig);
        ConfigInfos configInfos = AbstractHerder.generateResult((String)connectorClassName, Collections.emptyMap(), (List)validatedConnectorConfig.configValues(), (List)connector.config().groups());
        if (configInfos.errorCount() > 0) {
            String errors = configInfos.values().stream().flatMap(v -> v.configValue().errors().stream()).collect(Collectors.joining(" "));
            throw new DebeziumException("Connector configuration is not valid. " + errors);
        }
        LOGGER.debug("Connector configuration is valid.");
        return connectorConfig;
    }

    private OffsetBackingStore createAndStartOffsetStore(Map<String, String> connectorConfig) throws Exception {
        MemoryOffsetBackingStore offsetStore;
        String offsetStoreClassName = this.config.getString(AsyncEngineConfig.OFFSET_STORAGE);
        LOGGER.debug("Creating instance of offset store for {}.", (Object)offsetStoreClassName);
        if (offsetStoreClassName.equals(MemoryOffsetBackingStore.class.getName())) {
            offsetStore = KafkaConnectUtil.memoryOffsetBackingStore();
        } else if (offsetStoreClassName.equals(FileOffsetBackingStore.class.getName())) {
            offsetStore = KafkaConnectUtil.fileOffsetBackingStore();
        } else if (offsetStoreClassName.equals(KafkaOffsetBackingStore.class.getName())) {
            offsetStore = KafkaConnectUtil.kafkaOffsetBackingStore(connectorConfig);
        } else {
            Class<?> offsetStoreClass = this.classLoader.loadClass(offsetStoreClassName);
            offsetStore = (OffsetBackingStore)offsetStoreClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        try {
            LOGGER.debug("Starting offset store.");
            offsetStore.configure(this.workerConfig);
            offsetStore.start();
        }
        catch (Throwable t) {
            LOGGER.debug("Failed to start offset store, stopping it now.");
            offsetStore.stop();
            throw t;
        }
        LOGGER.debug("Offset store {} successfully started.", (Object)offsetStoreClassName);
        return offsetStore;
    }

    private static boolean commitOffsets(OffsetStorageWriter offsetWriter, Clock clock, long commitTimeout, SourceTask task) throws InterruptedException, TimeoutException {
        long timeout = clock.currentTimeInMillis() + commitTimeout;
        if (!offsetWriter.beginFlush(commitTimeout, TimeUnit.MICROSECONDS)) {
            LOGGER.trace("No offset to be committed.");
            return false;
        }
        Future flush = offsetWriter.doFlush((error, result) -> {});
        if (flush == null) {
            LOGGER.warn("Flushing process probably failed, please check previous log for more details.");
            return false;
        }
        try {
            flush.get(Math.max(timeout - clock.currentTimeInMillis(), 0L), TimeUnit.MILLISECONDS);
            task.commit();
        }
        catch (InterruptedException e) {
            LOGGER.debug("Flush of the offsets interrupted, canceling the flush.");
            offsetWriter.cancelFlush();
            throw e;
        }
        catch (ExecutionException | TimeoutException e) {
            LOGGER.warn("Flush of the offsets failed, canceling the flush.", (Throwable)e);
            offsetWriter.cancelFlush();
            return false;
        }
        return true;
    }

    public DebeziumEngine.Signaler getSignaler() {
        if (this.signaler == null) {
            List channels = this.tasks().stream().map(EngineSourceTask::signalChannelWriter).flatMap(Optional::stream).toList();
            this.signaler = new EmbeddedEngineSignaler(channels);
        }
        return this.signaler;
    }

    private static DebeziumEngine.ChangeConsumer<SourceRecord> buildDefaultChangeConsumer(final Consumer<SourceRecord> consumer) {
        return new DebeziumEngine.ChangeConsumer<SourceRecord>(){

            public void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitter<SourceRecord> committer) throws InterruptedException {
                for (SourceRecord record : records) {
                    try {
                        consumer.accept(record);
                        committer.markProcessed((Object)record);
                    }
                    catch (StopEngineException ex) {
                        committer.markProcessed((Object)record);
                        throw ex;
                    }
                }
                committer.markBatchFinished();
            }
        };
    }

    private static DebeziumEngine.ChangeConsumer buildConvertingChangeConsumer(final Consumer consumer, final Function<SourceRecord, ?> recordConverter) {
        return new DebeziumEngine.ChangeConsumer<SourceRecord>(){

            public void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitter<SourceRecord> committer) throws InterruptedException {
                for (SourceRecord record : records) {
                    try {
                        consumer.accept(recordConverter.apply(record));
                        committer.markProcessed((Object)record);
                    }
                    catch (StopEngineException ex) {
                        committer.markProcessed((Object)record);
                        throw ex;
                    }
                }
                committer.markBatchFinished();
            }
        };
    }

    private int computeRecordThreads(String recordProcessingThreads) {
        ProcessingCores pc = ProcessingCores.parse(recordProcessingThreads);
        if (pc != null) {
            return pc.getCores();
        }
        int cores = Integer.valueOf(recordProcessingThreads);
        if (cores <= 0) {
            throw new IllegalArgumentException("Number of cores cannot be negative or zero!");
        }
        return cores;
    }

    private static enum State {
        CREATING,
        INITIALIZING,
        CREATING_TASKS,
        STARTING_TASKS,
        POLLING_TASKS,
        STOPPING,
        STOPPED;


        public static boolean shouldStopTasks(State state) {
            return STARTING_TASKS.compareTo(state) <= 0;
        }

        public static boolean canBeStopped(State state) {
            return STOPPING.compareTo(state) > 0;
        }
    }

    private static class DefaultCompletionCallback
    implements DebeziumEngine.CompletionCallback {
        private DefaultCompletionCallback() {
        }

        public void handle(boolean success, String message, Throwable error) {
            if (!success) {
                LOGGER.error(message, error);
            }
        }
    }

    private static enum RecordProcessingOrder {
        ORDERED("ORDERED"),
        UNORDERED("UNORDERED");

        private final String orderingPlaceholder;

        private RecordProcessingOrder(String orderingPlaceholder) {
            this.orderingPlaceholder = orderingPlaceholder;
        }

        public static RecordProcessingOrder parse(String value) {
            if (value == null) {
                return null;
            }
            String trimmedValue = value.trim();
            for (RecordProcessingOrder processingOrder : RecordProcessingOrder.values()) {
                if (!processingOrder.orderingPlaceholder.equalsIgnoreCase(trimmedValue)) continue;
                return processingOrder;
            }
            return null;
        }
    }

    private static class PollRecords
    extends RetryingCallable<Void> {
        final EngineSourceTask task;
        final RecordProcessor processor;
        final AtomicReference<State> engineState;

        PollRecords(EngineSourceTask task, RecordProcessor processor, AtomicReference<State> engineState) {
            super(Configuration.from(task.context().config()).getInteger(EmbeddedEngineConfig.ERRORS_MAX_RETRIES));
            this.task = task;
            this.processor = processor;
            this.engineState = engineState;
        }

        @Override
        public Void doCall() throws Exception {
            while (this.engineState.get() == State.POLLING_TASKS) {
                LOGGER.trace("Thread {} running task {} starts polling for records.", (Object)Thread.currentThread().getName(), (Object)this.task.connectTask());
                List changeRecords = this.task.connectTask().poll();
                LOGGER.trace("Thread {} polled {} records.", (Object)Thread.currentThread().getName(), changeRecords == null ? "no" : Integer.valueOf(changeRecords.size()));
                if (changeRecords != null && !changeRecords.isEmpty()) {
                    this.processor.processRecords(changeRecords);
                    continue;
                }
                LOGGER.trace("No records.");
            }
            return null;
        }

        @Override
        public DelayStrategy delayStrategy() {
            Configuration config = Configuration.from(this.task.context().config());
            return DelayStrategy.exponential((Duration)Duration.ofMillis(config.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_INITIAL_MS)), (Duration)Duration.ofMillis(config.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_MAX_MS)));
        }
    }

    private static class SourceRecordCommitter
    implements DebeziumEngine.RecordCommitter<SourceRecord> {
        final SourceTask task;
        final OffsetStorageWriter offsetWriter;
        final OffsetCommitPolicy offsetCommitPolicy;
        final Clock clock;
        final long commitTimeout;
        private long recordsSinceLastCommit = 0L;
        private long timeOfLastCommitMillis = 0L;

        SourceRecordCommitter(EngineSourceTask task) {
            this.task = task.connectTask();
            this.offsetWriter = task.context().offsetStorageWriter();
            this.offsetCommitPolicy = task.context().offsetCommitPolicy();
            this.clock = task.context().clock();
            this.commitTimeout = Configuration.from(task.context().config()).getLong(EmbeddedEngineConfig.OFFSET_COMMIT_TIMEOUT_MS);
        }

        public void markProcessed(SourceRecord record) throws InterruptedException {
            this.task.commitRecord(record);
            ++this.recordsSinceLastCommit;
            this.offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
        }

        public void markBatchFinished() throws InterruptedException {
            Duration durationSinceLastCommit = Duration.ofMillis(this.clock.currentTimeInMillis() - this.timeOfLastCommitMillis);
            if (this.offsetCommitPolicy.performCommit(this.recordsSinceLastCommit, durationSinceLastCommit)) {
                try {
                    if (AsyncEmbeddedEngine.commitOffsets(this.offsetWriter, this.clock, this.commitTimeout, this.task)) {
                        this.recordsSinceLastCommit = 0L;
                        this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
                    }
                }
                catch (TimeoutException e) {
                    throw new DebeziumException("Timed out while waiting for committing task offset", (Throwable)e);
                }
            }
        }

        public void markProcessed(SourceRecord record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
            DebeziumEngineCommon.SourceRecordOffsets offsets = (DebeziumEngineCommon.SourceRecordOffsets)sourceOffsets;
            SourceRecord recordWithUpdatedOffsets = new SourceRecord(record.sourcePartition(), offsets.getOffsets(), record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp(), (Iterable)record.headers());
            this.markProcessed(recordWithUpdatedOffsets);
        }

        public DebeziumEngine.Offsets buildOffsets() {
            return new DebeziumEngineCommon.SourceRecordOffsets();
        }
    }

    private class ConvertingRecordCommitter
    implements DebeziumEngine.RecordCommitter<R> {
        private final SourceRecordCommitter delegate;

        ConvertingRecordCommitter(EngineSourceTask task) {
            this.delegate = new SourceRecordCommitter(task);
        }

        public void markProcessed(R record) throws InterruptedException {
            this.delegate.markProcessed(AsyncEmbeddedEngine.this.sourceConverter.apply(record));
        }

        public void markBatchFinished() throws InterruptedException {
            this.delegate.markBatchFinished();
        }

        public void markProcessed(R record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
            this.delegate.markProcessed(AsyncEmbeddedEngine.this.sourceConverter.apply(record), sourceOffsets);
        }

        public DebeziumEngine.Offsets buildOffsets() {
            return this.delegate.buildOffsets();
        }
    }

    private static enum ProcessingCores {
        AVAILABLE_CORES("AVAILABLE_CORES", AsyncEngineConfig.AVAILABLE_CORES);

        private final String coresPlaceholder;
        private final int cores;

        private ProcessingCores(String coresPlaceholder, int cores) {
            this.coresPlaceholder = coresPlaceholder;
            this.cores = cores;
        }

        public int getCores() {
            return this.cores;
        }

        public static ProcessingCores parse(String value) {
            if (value == null) {
                return null;
            }
            String trimmedValue = value.trim();
            for (ProcessingCores processingCores : ProcessingCores.values()) {
                if (!processingCores.coresPlaceholder.equalsIgnoreCase(trimmedValue)) continue;
                return processingCores;
            }
            return null;
        }
    }

    public static final class AsyncEngineBuilder<R>
    implements DebeziumEngine.Builder<R> {
        private Properties config;
        private Consumer<R> consumer;
        private DebeziumEngine.ChangeConsumer<?> handler;
        private ClassLoader classLoader;
        private Clock clock;
        private DebeziumEngine.CompletionCallback completionCallback;
        private DebeziumEngine.ConnectorCallback connectorCallback;
        private OffsetCommitPolicy offsetCommitPolicy = null;
        private HeaderConverter headerConverter;
        private Function<SourceRecord, R> recordConverter;
        private ConverterBuilder converterBuilder;

        AsyncEngineBuilder() {
            this((KeyValueHeaderChangeEventFormat)null);
        }

        AsyncEngineBuilder(ChangeEventFormat<?> format) {
            this(KeyValueHeaderChangeEventFormat.of(null, (Class)format.getValueFormat(), null));
        }

        AsyncEngineBuilder(KeyValueChangeEventFormat<?, ?> format) {
            this(format instanceof KeyValueHeaderChangeEventFormat ? (KeyValueHeaderChangeEventFormat)format : KeyValueHeaderChangeEventFormat.of((Class)format.getKeyFormat(), (Class)format.getValueFormat(), null));
        }

        AsyncEngineBuilder(KeyValueHeaderChangeEventFormat<?, ?, ?> format) {
            if (format != null) {
                this.converterBuilder = new ConverterBuilder();
                this.converterBuilder.using(format);
            }
        }

        public DebeziumEngine.Builder<R> notifying(Consumer<R> consumer) {
            this.consumer = consumer;
            if (this.config.contains(AsyncEngineConfig.RECORD_PROCESSING_WITH_SERIAL_CONSUMER.name()) && this.config.getProperty(AsyncEngineConfig.RECORD_PROCESSING_WITH_SERIAL_CONSUMER.name()).equalsIgnoreCase("true")) {
                this.handler = this.recordConverter == null ? AsyncEmbeddedEngine.buildDefaultChangeConsumer(consumer) : AsyncEmbeddedEngine.buildConvertingChangeConsumer(consumer, this.recordConverter);
            }
            return this;
        }

        public DebeziumEngine.Builder<R> notifying(DebeziumEngine.ChangeConsumer<R> handler) {
            this.handler = handler;
            if (!this.config.contains(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()) && !handler.supportsTombstoneEvents()) {
                LOGGER.info("Consumer doesn't support tombstone events, setting '{}' to false.", (Object)CommonConnectorConfig.TOMBSTONES_ON_DELETE.name());
                this.config.put(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name(), "false");
            }
            return this;
        }

        public DebeziumEngine.Builder<R> using(Properties config) {
            this.config = config;
            if (this.converterBuilder != null) {
                this.converterBuilder.using(config);
            }
            return this;
        }

        public DebeziumEngine.Builder<R> using(ClassLoader classLoader) {
            this.classLoader = classLoader;
            return this;
        }

        public DebeziumEngine.Builder<R> using(java.time.Clock clock) {
            this.clock = clock::millis;
            return this;
        }

        public DebeziumEngine.Builder<R> using(DebeziumEngine.CompletionCallback completionCallback) {
            this.completionCallback = completionCallback;
            return this;
        }

        public DebeziumEngine.Builder<R> using(DebeziumEngine.ConnectorCallback connectorCallback) {
            this.connectorCallback = connectorCallback;
            return this;
        }

        public DebeziumEngine.Builder<R> using(OffsetCommitPolicy policy) {
            this.offsetCommitPolicy = policy;
            return this;
        }

        public DebeziumEngine<R> build() {
            if (this.converterBuilder != null) {
                this.headerConverter = this.converterBuilder.headerConverter();
                this.recordConverter = this.converterBuilder.toFormat(this.headerConverter);
            }
            return new AsyncEmbeddedEngine<R>(this.config, this.consumer, this.handler, this.classLoader, this.clock, this.completionCallback, this.connectorCallback, this.offsetCommitPolicy, this.headerConverter, this.recordConverter);
        }
    }
}

