/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.cdc.source.listening;

import io.debezium.config.Configuration;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.spi.OffsetCommitPolicy;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.stream.input.source.SourceMapper;
import io.siddhi.extension.io.cdc.source.metrics.CDCStatus;
import io.siddhi.extension.io.cdc.source.metrics.ListeningMetrics;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;

public abstract class ChangeDataCapture {
    private final ListeningMetrics metrics;
    private String operation;
    private Configuration config;
    private SourceEventListener sourceEventListener;
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = this.lock.newCondition();
    private boolean paused = false;
    private long previousEventCount;

    public ChangeDataCapture(String operation, SourceEventListener sourceEventListener, ListeningMetrics metrics) {
        this.operation = operation;
        this.sourceEventListener = sourceEventListener;
        this.metrics = metrics;
    }

    public void setConfig(Map<String, Object> configMap) {
        this.config = Configuration.empty();
        for (Map.Entry<String, Object> entry : configMap.entrySet()) {
            this.config = ((Configuration.Builder)this.config.edit().with(entry.getKey(), entry.getValue())).build();
        }
    }

    public EmbeddedEngine getEngine(EmbeddedEngine.CompletionCallback completionCallback) {
        EmbeddedEngine.Builder builder = EmbeddedEngine.create().using(OffsetCommitPolicy.always()).using(completionCallback).using(this.config);
        if (builder == null) {
            if (this.metrics != null) {
                this.metrics.setCDCStatus(CDCStatus.ERROR);
            }
            throw new SiddhiAppRuntimeException("CDC Engine create failed. Check parameters.");
        }
        EmbeddedEngine engine = builder.notifying(this::handleEvent).build();
        return engine;
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
        try {
            this.lock.lock();
            this.condition.signal();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleEvent(ConnectRecord connectRecord) {
        Map<String, Object> detailsMap;
        Object[] transportProperties = null;
        if (this.paused) {
            this.lock.lock();
            try {
                while (this.paused) {
                    this.condition.await();
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
            finally {
                this.lock.unlock();
            }
        }
        if (!(detailsMap = this.createMap(connectRecord, this.operation)).isEmpty()) {
            if (detailsMap.containsKey("transportProperties")) {
                transportProperties = ((List)detailsMap.get("transportProperties")).toArray();
                detailsMap.remove("transportProperties");
            }
            this.previousEventCount = ((SourceMapper)this.sourceEventListener).getEventCount();
            this.sourceEventListener.onEvent(detailsMap, transportProperties);
            if (this.metrics != null) {
                this.metrics.getTotalReadsMetrics().inc();
                this.metrics.getEventCountMetric().inc();
                this.metrics.getTotalEventCounterMetric().inc();
                long eventCount = ((SourceMapper)this.sourceEventListener).getEventCount() - this.previousEventCount;
                this.metrics.getValidEventCountMetric().inc(eventCount);
                this.metrics.setCDCStatus(CDCStatus.CONSUMING);
                this.metrics.setLastReceivedTime(System.currentTimeMillis());
            }
        }
    }

    abstract Map<String, Object> createMap(ConnectRecord var1, String var2);

    public List<String> createOperationList(String operation) {
        return Arrays.asList(operation.split(","));
    }

    public Object getDefaultValue(Schema schema) {
        switch (schema.type()) {
            case STRING: {
                return "";
            }
            case BOOLEAN: {
                return false;
            }
            case FLOAT64: 
            case FLOAT32: {
                return 0.0;
            }
        }
        return 0;
    }
}

