package io.siddhi.extension.io.nats.source.nats;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.nats.util.NATSConstants;
import io.siddhi.extension.io.nats.util.NATSUtils;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;

/* loaded from: input_file:io/siddhi/extension/io/nats/source/nats/NATSCore.class */
public class NATSCore {
    private static final Logger log = Logger.getLogger(NATSCore.class);
    protected String destination;
    protected String[] natsUrls;
    protected String queueGroupName;
    protected String siddhiAppName;
    protected String streamId;
    protected Options.Builder natsOptionBuilder;
    protected String authType;
    private Connection natsClient;
    protected SourceEventListener sourceEventListener;
    protected ReentrantLock lock;
    protected Condition condition;
    protected volatile boolean pause;
    protected AtomicInteger messageSequenceTracker;
    protected String[] requestedTransportPropertyNames;

    public StateFactory initiateNatsClient(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppName = siddhiAppContext.getName();
        this.streamId = sourceEventListener.getStreamDefinition().getId();
        this.destination = optionHolder.validateAndGetStaticValue(NATSConstants.DESTINATION);
        this.queueGroupName = optionHolder.validateAndGetStaticValue(NATSConstants.QUEUE_GROUP_NAME, (String) null);
        this.natsUrls = (optionHolder.isOptionExists(NATSConstants.BOOTSTRAP_SERVERS) ? optionHolder.validateAndGetStaticValue(NATSConstants.BOOTSTRAP_SERVERS) : optionHolder.validateAndGetStaticValue(NATSConstants.SERVER_URLS)).split(",");
        for (String str : this.natsUrls) {
            NATSUtils.validateNatsUrl(str, this.siddhiAppName);
        }
        Properties properties = new Properties();
        if (optionHolder.isOptionExists(NATSConstants.OPTIONAL_CONFIGURATION)) {
            NATSUtils.splitHeaderValues(optionHolder.validateAndGetStaticValue(NATSConstants.OPTIONAL_CONFIGURATION), properties);
        }
        this.natsOptionBuilder = new Options.Builder(properties);
        this.natsOptionBuilder.servers(this.natsUrls);
        if (optionHolder.isOptionExists(NATSConstants.AUTH_TYPE)) {
            this.authType = optionHolder.validateAndGetStaticValue(NATSConstants.AUTH_TYPE);
            NATSUtils.addAuthentication(optionHolder, this.natsOptionBuilder, this.authType, this.siddhiAppName, this.streamId);
        }
        this.sourceEventListener = sourceEventListener;
        this.messageSequenceTracker = new AtomicInteger(0);
        this.lock = new ReentrantLock();
        this.condition = this.lock.newCondition();
        this.requestedTransportPropertyNames = (String[]) strArr.clone();
        return null;
    }

    public void createConnection(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        try {
            this.natsClient = Nats.connect(this.natsOptionBuilder.build());
            Dispatcher createDispatcher = this.natsClient.createDispatcher(message -> {
                if (this.pause) {
                    this.lock.lock();
                    try {
                        this.condition.await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } finally {
                        this.lock.unlock();
                    }
                }
                this.messageSequenceTracker.incrementAndGet();
                String[] strArr = new String[this.requestedTransportPropertyNames.length];
                for (int i = 0; i < this.requestedTransportPropertyNames.length; i++) {
                    if (this.requestedTransportPropertyNames[i].equalsIgnoreCase(NATSConstants.SEQUENCE_NUMBER)) {
                        strArr[i] = String.valueOf(this.messageSequenceTracker.get());
                    }
                }
                this.sourceEventListener.onEvent(message.getData(), strArr);
            });
            if (this.queueGroupName != null) {
                createDispatcher.subscribe(this.destination, this.queueGroupName);
            } else {
                createDispatcher.subscribe(this.destination);
            }
        } catch (IOException e) {
            throw new ConnectionUnavailableException("Error occurred in initializing the NATS receiver in '" + this.siddhiAppName + "' for stream: " + this.streamId);
        } catch (InterruptedException e2) {
            throw new SiddhiAppRuntimeException("Error occurred in initializing the NATS receiver in '" + this.siddhiAppName + "' for stream: '" + this.streamId + "'.The calling thread is interrupted before the connection completes.");
        }
    }

    public void disconnect() {
        if (this.natsClient != null) {
            try {
            } catch (InterruptedException e) {
                log.error("Error while disconnecting the nats client. Thread was interrupted before closing the connection.");
            } finally {
                this.natsClient = null;
            }
            if (this.natsClient.getStatus() != Connection.Status.CLOSED) {
                this.natsClient.close();
            }
        }
    }

    public void pause() {
        this.pause = true;
    }

    public void resume() {
        this.pause = false;
        try {
            this.lock.lock();
            this.condition.signalAll();
        } finally {
            this.lock.unlock();
        }
    }
}
