package io.siddhi.extension.io.nats.source;

import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.nats.source.exception.NATSInputAdaptorRuntimeException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;

/* loaded from: input_file:io/siddhi/extension/io/nats/source/NATSMessageProcessor.class */
public class NATSMessageProcessor implements MessageHandler {
    private static final Logger log = Logger.getLogger(NATSMessageProcessor.class);
    private SourceEventListener sourceEventListener;
    private boolean paused;
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = this.lock.newCondition();
    private AtomicInteger messageSequenceTracker;

    /* JADX INFO: Access modifiers changed from: protected */
    public NATSMessageProcessor(SourceEventListener sourceEventListener, AtomicInteger atomicInteger) {
        this.sourceEventListener = sourceEventListener;
        this.messageSequenceTracker = atomicInteger;
    }

    public void onMessage(Message message) {
        if (this.paused) {
            this.lock.lock();
            try {
                this.condition.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                this.lock.unlock();
            }
        }
        this.sourceEventListener.onEvent(message.getData(), new String[0]);
        this.messageSequenceTracker.incrementAndGet();
        try {
            message.ack();
        } catch (IOException e2) {
            String str = new String(message.getData(), StandardCharsets.UTF_8);
            log.error("Error occurred while sending the ack for message : " + str + ".Received to the stream: " + this.sourceEventListener.getStreamDefinition().getId());
            throw new NATSInputAdaptorRuntimeException("Error occurred while sending the ack for message : " + str + ".Received to the stream: " + this.sourceEventListener.getStreamDefinition().getId(), e2);
        }
    }

    protected AtomicInteger getMessageSequenceTracker() {
        return this.messageSequenceTracker;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pause() {
        this.paused = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resume() {
        this.paused = false;
        try {
            this.lock.lock();
            this.condition.signalAll();
        } finally {
            this.lock.unlock();
        }
    }
}
