/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.cdc.source.listening;

import io.debezium.config.Configuration;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.spi.OffsetCommitPolicy;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.SourceEventListener;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;

public class ChangeDataCapture {
    private String operation;
    private Configuration config;
    private SourceEventListener sourceEventListener;
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = this.lock.newCondition();
    private boolean paused = false;

    public ChangeDataCapture(String operation, SourceEventListener sourceEventListener) {
        this.operation = operation;
        this.sourceEventListener = sourceEventListener;
    }

    public void setConfig(Map<String, Object> configMap) {
        this.config = Configuration.empty();
        for (Map.Entry<String, Object> entry : configMap.entrySet()) {
            this.config = ((Configuration.Builder)this.config.edit().with(entry.getKey(), entry.getValue())).build();
        }
    }

    public EmbeddedEngine getEngine(EmbeddedEngine.CompletionCallback completionCallback) {
        EmbeddedEngine.Builder builder = EmbeddedEngine.create().using(OffsetCommitPolicy.always()).using(completionCallback).using(this.config);
        if (builder == null) {
            throw new SiddhiAppRuntimeException("CDC Engine create failed. Check parameters.");
        }
        EmbeddedEngine engine = builder.notifying(this::handleEvent).build();
        return engine;
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
        try {
            this.lock.lock();
            this.condition.signal();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleEvent(ConnectRecord connectRecord) {
        Map<String, Object> detailsMap;
        if (this.paused) {
            this.lock.lock();
            try {
                while (this.paused) {
                    this.condition.await();
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
            finally {
                this.lock.unlock();
            }
        }
        if (!(detailsMap = this.createMap(connectRecord, this.operation)).isEmpty()) {
            this.sourceEventListener.onEvent(detailsMap, null);
        }
    }

    private Map<String, Object> createMap(ConnectRecord connectRecord, String operation) {
        String op;
        HashMap<String, Object> detailsMap = new HashMap<String, Object>();
        Struct record = (Struct)connectRecord.value();
        try {
            op = (String)record.get("op");
        }
        catch (NullPointerException | DataException ex) {
            return detailsMap;
        }
        if (operation.equalsIgnoreCase("insert") && op.equals("c") || operation.equalsIgnoreCase("delete") && op.equals("d") || operation.equalsIgnoreCase("update") && op.equals("u")) {
            switch (op) {
                case "c": {
                    Struct rawDetails = (Struct)record.get("after");
                    List<Field> fields = rawDetails.schema().fields();
                    for (Field key : fields) {
                        String fieldName = key.name();
                        detailsMap.put(fieldName, rawDetails.get(fieldName));
                    }
                    break;
                }
                case "d": {
                    Struct rawDetails = (Struct)record.get("before");
                    List<Field> fields = rawDetails.schema().fields();
                    for (Field key : fields) {
                        String fieldName = key.name();
                        detailsMap.put("before_" + fieldName, rawDetails.get(fieldName));
                    }
                    break;
                }
                case "u": {
                    String fieldName;
                    Struct rawDetails = (Struct)record.get("before");
                    List<Field> fields = rawDetails.schema().fields();
                    for (Field key : fields) {
                        fieldName = key.name();
                        detailsMap.put("before_" + fieldName, rawDetails.get(fieldName));
                    }
                    rawDetails = (Struct)record.get("after");
                    fields = rawDetails.schema().fields();
                    for (Field key : fields) {
                        fieldName = key.name();
                        detailsMap.put(fieldName, rawDetails.get(fieldName));
                    }
                    break;
                }
            }
        }
        return detailsMap;
    }
}

