package io.siddhi.extension.io.nats.source.nats;

import io.nats.client.Nats;
import io.nats.streaming.ConnectionLostHandler;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.StreamingConnectionFactory;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.nats.source.NATSMessageProcessor;
import io.siddhi.extension.io.nats.util.NATSConstants;
import io.siddhi.extension.io.nats.util.NATSUtils;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;

/* loaded from: input_file:io/siddhi/extension/io/nats/source/nats/NATSStreaming.class */
public class NATSStreaming extends NATSCore {
    private static final Logger log = Logger.getLogger(NATSStreaming.class);
    private String durableName;
    private String sequenceNumber;
    private Subscription subscription;
    private NATSMessageProcessor natsMessageProcessor;
    private String clusterId;
    private String clientId;
    private StreamingConnection streamingConnection;
    private long ackWait;

    /* loaded from: input_file:io/siddhi/extension/io/nats/source/nats/NATSStreaming$NATSConnectionLostHandler.class */
    class NATSConnectionLostHandler implements ConnectionLostHandler {
        private Source.ConnectionCallback connectionCallback;

        NATSConnectionLostHandler(Source.ConnectionCallback connectionCallback) {
            this.connectionCallback = connectionCallback;
        }

        public void connectionLost(StreamingConnection streamingConnection, Exception exc) {
            NATSStreaming.log.error("Exception occurred in Siddhi App '" + NATSStreaming.this.siddhiAppName + "' when consuming messages from NATS endpoint " + Arrays.toString(NATSStreaming.this.natsUrls) + " . " + exc.getMessage(), exc);
            Executors.newFixedThreadPool(1).execute(() -> {
                this.connectionCallback.onError(new ConnectionUnavailableException(exc));
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/io/nats/source/nats/NATSStreaming$NATSSourceState.class */
    public class NATSSourceState extends State {
        private AtomicInteger lastSentSequenceNo = new AtomicInteger(0);

        NATSSourceState() {
        }

        public boolean canDestroy() {
            return this.lastSentSequenceNo.intValue() == 0;
        }

        public Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            hashMap.put(NATSStreaming.this.siddhiAppName, Integer.valueOf(this.lastSentSequenceNo.get()));
            return hashMap;
        }

        public void restore(Map<String, Object> map) {
            Object obj = map.get(NATSStreaming.this.siddhiAppName);
            if (obj == null || NATSStreaming.this.sequenceNumber != null) {
                return;
            }
            this.lastSentSequenceNo.set(((Integer) obj).intValue());
        }
    }

    @Override // io.siddhi.extension.io.nats.source.nats.NATSCore
    public StateFactory<NATSSourceState> initiateNatsClient(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        super.initiateNatsClient(sourceEventListener, optionHolder, strArr, configReader, siddhiAppContext);
        if (optionHolder.isOptionExists(NATSConstants.CLUSTER_ID)) {
            this.clusterId = optionHolder.validateAndGetStaticValue(NATSConstants.CLUSTER_ID);
        } else if (optionHolder.isOptionExists(NATSConstants.STREAMING_CLUSTER_ID)) {
            this.clusterId = optionHolder.validateAndGetStaticValue(NATSConstants.STREAMING_CLUSTER_ID);
        }
        this.clientId = optionHolder.validateAndGetStaticValue(NATSConstants.CLIENT_ID, NATSUtils.createClientId(this.siddhiAppName, this.streamId));
        if (optionHolder.isOptionExists(NATSConstants.DURABLE_NAME)) {
            this.durableName = optionHolder.validateAndGetStaticValue(NATSConstants.DURABLE_NAME);
        }
        if (optionHolder.isOptionExists(NATSConstants.SUBSCRIPTION_SEQUENCE)) {
            this.sequenceNumber = optionHolder.validateAndGetStaticValue(NATSConstants.SUBSCRIPTION_SEQUENCE);
        }
        if (optionHolder.isOptionExists(NATSConstants.ACK_WAIT)) {
            this.ackWait = Long.parseLong(optionHolder.validateAndGetStaticValue(NATSConstants.ACK_WAIT));
        }
        return () -> {
            return new NATSSourceState();
        };
    }

    @Override // io.siddhi.extension.io.nats.source.nats.NATSCore
    public void createConnection(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        try {
            this.streamingConnection = new StreamingConnectionFactory(new Options.Builder().clientId(this.clientId).clusterId(this.clusterId).natsConn(Nats.connect(this.natsOptionBuilder.build())).connectionLostHandler(new NATSConnectionLostHandler(connectionCallback)).build()).createConnection();
            subscribe((NATSSourceState) state);
        } catch (IOException e) {
            throw new ConnectionUnavailableException("Error while connecting to NATS server at destination: " + this.destination, e);
        } catch (InterruptedException e2) {
            throw new ConnectionUnavailableException("Error while connecting to NATS server at destination: " + this.destination + " .The calling thread is interrupted before the connection can be established.", e2);
        }
    }

    private void subscribe(NATSSourceState nATSSourceState) {
        SubscriptionOptions.Builder builder = new SubscriptionOptions.Builder();
        if (this.sequenceNumber != null && nATSSourceState.lastSentSequenceNo.intValue() < Integer.parseInt(this.sequenceNumber)) {
            nATSSourceState.lastSentSequenceNo.set(Integer.parseInt(this.sequenceNumber));
        }
        builder.startAtSequence(nATSSourceState.lastSentSequenceNo.get() + 1);
        if (this.ackWait != 0) {
            builder.manualAcks().ackWait(Duration.ofSeconds(this.ackWait));
        }
        try {
            if (this.durableName != null) {
                builder.durableName(this.durableName);
            }
            this.natsMessageProcessor = new NATSMessageProcessor(this.sourceEventListener, this.requestedTransportPropertyNames, nATSSourceState.lastSentSequenceNo, this.lock, this.condition, this.ackWait);
            if (this.queueGroupName != null) {
                this.subscription = this.streamingConnection.subscribe(this.destination, this.queueGroupName, this.natsMessageProcessor, builder.build());
            } else {
                this.subscription = this.streamingConnection.subscribe(this.destination, this.natsMessageProcessor, builder.build());
            }
        } catch (IOException e) {
            throw new SiddhiAppRuntimeException("Error occurred in initializing the NATS receiver for stream: '" + this.sourceEventListener.getStreamDefinition().getId() + "'.", e);
        } catch (InterruptedException e2) {
            throw new SiddhiAppRuntimeException("Error occurred in initializing the NATS receiver for stream: '" + this.sourceEventListener.getStreamDefinition().getId() + "'.The calling thread is interrupted before the connection completes.", e2);
        } catch (TimeoutException e3) {
            throw new SiddhiAppRuntimeException("Error occurred in initializing the NATS receiver for stream: '" + this.sourceEventListener.getStreamDefinition().getId() + "'.The server request cannot be completed within the subscription timeout.", e3);
        }
    }

    @Override // io.siddhi.extension.io.nats.source.nats.NATSCore
    public void disconnect() {
        try {
            if (this.streamingConnection != null) {
                this.streamingConnection.close();
            }
        } catch (IOException | InterruptedException | TimeoutException e) {
            log.error("Error disconnecting the nats streaming receiver", e);
        }
    }

    @Override // io.siddhi.extension.io.nats.source.nats.NATSCore
    public void pause() {
        if (this.natsMessageProcessor != null) {
            this.natsMessageProcessor.pause();
            if (log.isDebugEnabled()) {
                log.debug("Nats streaming source paused for destination: " + this.destination);
            }
        }
    }

    @Override // io.siddhi.extension.io.nats.source.nats.NATSCore
    public void resume() {
        if (this.natsMessageProcessor != null) {
            this.natsMessageProcessor.resume();
            if (log.isDebugEnabled()) {
                log.debug("Nats streaming source resumed for destination: " + this.destination);
            }
        }
    }
}
