package io.siddhi.extension.io.cdc.source.listening;

import io.debezium.config.Configuration;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.spi.OffsetCommitPolicy;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.SourceEventListener;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.connector.ConnectRecord;

/* loaded from: input_file:io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.class */
public abstract class ChangeDataCapture {
    private String operation;
    private Configuration config;
    private SourceEventListener sourceEventListener;
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = this.lock.newCondition();
    private boolean paused = false;

    public ChangeDataCapture(String str, SourceEventListener sourceEventListener) {
        this.operation = str;
        this.sourceEventListener = sourceEventListener;
    }

    public void setConfig(Map<String, Object> map) {
        this.config = Configuration.empty();
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            this.config = this.config.edit().with(entry.getKey(), entry.getValue()).build();
        }
    }

    public EmbeddedEngine getEngine(EmbeddedEngine.CompletionCallback completionCallback) {
        EmbeddedEngine.Builder using = EmbeddedEngine.create().using(OffsetCommitPolicy.always()).using(completionCallback).using(this.config);
        if (using == null) {
            throw new SiddhiAppRuntimeException("CDC Engine create failed. Check parameters.");
        }
        return using.notifying((v1) -> {
            handleEvent(v1);
        }).build();
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
        try {
            this.lock.lock();
            this.condition.signal();
        } finally {
            this.lock.unlock();
        }
    }

    private void handleEvent(ConnectRecord connectRecord) {
        if (this.paused) {
            this.lock.lock();
            while (this.paused) {
                try {
                    try {
                        this.condition.await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        this.lock.unlock();
                    }
                } finally {
                    this.lock.unlock();
                }
            }
        }
        Map<String, Object> createMap = createMap(connectRecord, this.operation);
        if (createMap.isEmpty()) {
            return;
        }
        this.sourceEventListener.onEvent(createMap, (String[]) null);
    }

    abstract Map<String, Object> createMap(ConnectRecord connectRecord, String str);
}
