package io.siddhi.extension.io.cdc.source.listening;

import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.spi.OffsetCommitPolicy;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.cdc.util.CDCSourceConstants;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;

/* loaded from: input_file:io/siddhi/extension/io/cdc/source/listening/ChangeDataCapture.class */
public class ChangeDataCapture {
    private String operation;
    private Configuration config;
    private SourceEventListener sourceEventListener;
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = this.lock.newCondition();
    private boolean paused = false;

    public ChangeDataCapture(String str, SourceEventListener sourceEventListener) {
        this.operation = str;
        this.sourceEventListener = sourceEventListener;
    }

    public void setConfig(Map<String, Object> map) {
        this.config = Configuration.empty();
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            this.config = this.config.edit().with(entry.getKey(), entry.getValue()).build();
        }
    }

    public EmbeddedEngine getEngine(EmbeddedEngine.CompletionCallback completionCallback) {
        EmbeddedEngine.Builder using = EmbeddedEngine.create().using(OffsetCommitPolicy.always()).using(completionCallback).using(this.config);
        if (using == null) {
            throw new SiddhiAppRuntimeException("CDC Engine create failed. Check parameters.");
        }
        return using.notifying((v1) -> {
            handleEvent(v1);
        }).build();
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
        try {
            this.lock.lock();
            this.condition.signal();
        } finally {
            this.lock.unlock();
        }
    }

    private void handleEvent(ConnectRecord connectRecord) {
        if (this.paused) {
            this.lock.lock();
            while (this.paused) {
                try {
                    try {
                        this.condition.await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        this.lock.unlock();
                    }
                } finally {
                    this.lock.unlock();
                }
            }
        }
        Map<String, Object> createMap = createMap(connectRecord, this.operation);
        if (createMap.isEmpty()) {
            return;
        }
        this.sourceEventListener.onEvent(createMap, (String[]) null);
    }

    private Map<String, Object> createMap(ConnectRecord connectRecord, String str) {
        HashMap hashMap = new HashMap();
        Struct struct = (Struct) connectRecord.value();
        try {
            String str2 = (String) struct.get(Envelope.FieldName.OPERATION);
            if ((str.equalsIgnoreCase(CDCSourceConstants.INSERT) && str2.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION)) || ((str.equalsIgnoreCase("delete") && str2.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION)) || (str.equalsIgnoreCase(CDCSourceConstants.UPDATE) && str2.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)))) {
                boolean z = -1;
                switch (str2.hashCode()) {
                    case 99:
                        if (str2.equals(CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION)) {
                            z = false;
                            break;
                        }
                        break;
                    case 100:
                        if (str2.equals(CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION)) {
                            z = true;
                            break;
                        }
                        break;
                    case 117:
                        if (str2.equals(CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION)) {
                            z = 2;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        Struct struct2 = (Struct) struct.get("after");
                        Iterator<Field> it = struct2.schema().fields().iterator();
                        while (it.hasNext()) {
                            String name = it.next().name();
                            hashMap.put(name, getValue(struct2.get(name)));
                        }
                        break;
                    case true:
                        Struct struct3 = (Struct) struct.get("before");
                        Iterator<Field> it2 = struct3.schema().fields().iterator();
                        while (it2.hasNext()) {
                            String name2 = it2.next().name();
                            hashMap.put(CDCSourceConstants.BEFORE_PREFIX + name2, getValue(struct3.get(name2)));
                        }
                        break;
                    case true:
                        Struct struct4 = (Struct) struct.get("before");
                        Iterator<Field> it3 = struct4.schema().fields().iterator();
                        while (it3.hasNext()) {
                            String name3 = it3.next().name();
                            hashMap.put(CDCSourceConstants.BEFORE_PREFIX + name3, getValue(struct4.get(name3)));
                        }
                        Struct struct5 = (Struct) struct.get("after");
                        Iterator<Field> it4 = struct5.schema().fields().iterator();
                        while (it4.hasNext()) {
                            String name4 = it4.next().name();
                            hashMap.put(name4, getValue(struct5.get(name4)));
                        }
                        break;
                }
            }
            return hashMap;
        } catch (NullPointerException | DataException e) {
            return hashMap;
        }
    }

    private Object getValue(Object obj) {
        if (!(obj instanceof Struct)) {
            return obj instanceof Short ? Integer.valueOf(((Short) obj).intValue()) : obj instanceof Byte ? Integer.valueOf(((Byte) obj).intValue()) : obj;
        }
        BigDecimal orElse = VariableScaleDecimal.toLogical((Struct) obj).getDecimalValue().orElse(null);
        if (orElse == null) {
            return null;
        }
        return Long.valueOf(orElse.longValue());
    }
}
