/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.signal.channels;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.pipeline.signal.SignalRecord;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class SourceSignalChannel
implements SignalChannelReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(SourceSignalChannel.class);
    public static final String CHANNEL_NAME = "source";
    public static final Queue<SignalRecord> SIGNALS = new ConcurrentLinkedQueue<SignalRecord>();
    public CommonConnectorConfig connectorConfig;

    @Override
    public String name() {
        return CHANNEL_NAME;
    }

    @Override
    public void init(CommonConnectorConfig connectorConfig) {
        this.connectorConfig = connectorConfig;
    }

    @Override
    public List<SignalRecord> read() {
        LOGGER.trace("Reading signaling events from queue");
        SignalRecord signalRecord = SIGNALS.poll();
        if (signalRecord == null) {
            return List.of();
        }
        return List.of(signalRecord);
    }

    @Override
    public void close() {
        SIGNALS.clear();
    }

    public boolean process(Struct value) throws InterruptedException {
        LOGGER.trace("Received event from signaling table. Enqueue for process");
        try {
            Optional<SignalRecord> result = SignalRecord.buildSignalRecordFromChangeEventSource(value, this.connectorConfig);
            if (result.isEmpty()) {
                return false;
            }
            SignalRecord signalRecord = result.get();
            SIGNALS.add(signalRecord);
            return true;
        }
        catch (Exception e) {
            LOGGER.warn("Exception while preparing to process the signal '{}'", (Object)value, (Object)e);
            return false;
        }
    }
}

