package io.siddhi.extension.io.nats.sink.nats;

import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.nats.util.NATSConstants;
import io.siddhi.extension.io.nats.util.NATSUtils;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/siddhi/extension/io/nats/sink/nats/NATSCore.class */
public class NATSCore {
    private static final Logger log = LogManager.getLogger(NATSCore.class);
    protected Option destination;
    protected String[] natsUrls;
    protected String streamId;
    protected String siddhiAppName;
    protected Options.Builder natsOptionBuilder;
    protected Connection connection;
    protected AtomicBoolean isConnected = new AtomicBoolean(true);
    protected String authType;

    public void initiateClient(OptionHolder optionHolder, String str, String str2) {
        this.destination = optionHolder.validateAndGetOption(NATSConstants.DESTINATION);
        this.natsUrls = (optionHolder.isOptionExists(NATSConstants.BOOTSTRAP_SERVERS) ? optionHolder.validateAndGetStaticValue(NATSConstants.BOOTSTRAP_SERVERS) : optionHolder.validateAndGetStaticValue(NATSConstants.SERVER_URLS)).split(",");
        for (String str3 : this.natsUrls) {
            NATSUtils.validateNatsUrl(str3, str);
        }
        this.siddhiAppName = str;
        this.streamId = str2;
        Properties properties = new Properties();
        if (optionHolder.isOptionExists(NATSConstants.OPTIONAL_CONFIGURATION)) {
            NATSUtils.splitHeaderValues(optionHolder.validateAndGetStaticValue(NATSConstants.OPTIONAL_CONFIGURATION), properties);
        }
        this.natsOptionBuilder = new Options.Builder(properties);
        this.natsOptionBuilder.servers(this.natsUrls);
        if (optionHolder.isOptionExists(NATSConstants.AUTH_TYPE)) {
            this.authType = optionHolder.validateAndGetStaticValue(NATSConstants.AUTH_TYPE);
            NATSUtils.addAuthentication(optionHolder, this.natsOptionBuilder, this.authType, str, str2);
        }
    }

    public void createNATSClient() throws ConnectionUnavailableException {
        this.natsOptionBuilder.connectionListener((connection, events) -> {
            if (events == ConnectionListener.Events.CLOSED) {
                this.isConnected = new AtomicBoolean(false);
            }
        });
        this.connection = createNatsConnection();
        this.isConnected.set(true);
    }

    public void publishMessages(Object obj, byte[] bArr, DynamicOptions dynamicOptions) throws ConnectionUnavailableException {
        if (!this.isConnected.get()) {
            this.connection = createNatsConnection();
        }
        this.connection.publish(this.destination.getValue(dynamicOptions), bArr);
    }

    public void disconnect() {
        if (this.connection == null || !this.isConnected.get()) {
            return;
        }
        try {
            this.connection.flush(Duration.ofMillis(50L));
            this.connection.close();
        } catch (InterruptedException | TimeoutException e) {
            log.error("Error disconnecting the nats receiver in Siddhi App '" + this.siddhiAppName + "' when publishing messages to NATS endpoint " + Arrays.toString(this.natsUrls) + " . " + e.getMessage(), e);
        }
    }

    private Connection createNatsConnection() throws ConnectionUnavailableException {
        try {
            return Nats.connect(this.natsOptionBuilder.build());
        } catch (IOException e) {
            throw new ConnectionUnavailableException("Error in Siddhi App '" + this.siddhiAppName + "' while connecting to NATS server endpoint " + Arrays.toString(this.natsUrls) + " at destination: " + this.destination.getValue(), e);
        } catch (InterruptedException e2) {
            throw new ConnectionUnavailableException("Error in Siddhi App '" + this.siddhiAppName + "' while connecting to NATS server endpoint " + Arrays.toString(this.natsUrls) + " at destination: " + this.destination.getValue() + ". The calling thread is interrupted before the connection can be established.", e2);
        }
    }
}
