package io.siddhi.extension.io.nats.source;

import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.nats.util.NATSConstants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:io/siddhi/extension/io/nats/source/NATSMessageProcessor.class */
public class NATSMessageProcessor implements MessageHandler {
    private SourceEventListener sourceEventListener;
    private volatile boolean paused;
    private ReentrantLock lock;
    private Condition condition;
    private AtomicInteger messageSequenceTracker;
    private String[] requestedTransportPropertyNames;
    private long ackWait;

    public NATSMessageProcessor(SourceEventListener sourceEventListener, String[] strArr, AtomicInteger atomicInteger, ReentrantLock reentrantLock, Condition condition, long j) {
        this.sourceEventListener = sourceEventListener;
        this.messageSequenceTracker = atomicInteger;
        this.requestedTransportPropertyNames = (String[]) strArr.clone();
        this.lock = reentrantLock;
        this.condition = condition;
        this.ackWait = j;
    }

    public void onMessage(Message message) {
        if (this.paused) {
            this.lock.lock();
            try {
                this.condition.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                this.lock.unlock();
            }
        }
        this.messageSequenceTracker.incrementAndGet();
        String[] strArr = new String[this.requestedTransportPropertyNames.length];
        for (int i = 0; i < this.requestedTransportPropertyNames.length; i++) {
            if (this.requestedTransportPropertyNames[i].equalsIgnoreCase(NATSConstants.SEQUENCE_NUMBER)) {
                strArr[i] = String.valueOf(this.messageSequenceTracker.get());
            }
        }
        this.sourceEventListener.onEvent(message.getData(), strArr);
        if (this.ackWait != 0) {
            try {
                message.ack();
            } catch (IOException e2) {
                throw new SiddhiAppRuntimeException("Error occurred while sending the ack for message : " + new String(message.getData(), StandardCharsets.UTF_8) + ". Received to the stream: '" + this.sourceEventListener.getStreamDefinition().getId() + "'.", e2);
            }
        }
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
        try {
            this.lock.lock();
            this.condition.signalAll();
        } finally {
            this.lock.unlock();
        }
    }
}
