package io.siddhi.extension.io.nats.sink.nats;

import io.nats.client.Nats;
import io.nats.streaming.ConnectionLostHandler;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.StreamingConnectionFactory;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.nats.sink.AsyncAckHandler;
import io.siddhi.extension.io.nats.sink.NATSSink;
import io.siddhi.extension.io.nats.util.NATSConstants;
import io.siddhi.extension.io.nats.util.NATSUtils;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/siddhi/extension/io/nats/sink/nats/NATSStreaming.class */
public class NATSStreaming extends NATSCore {
    private static final Logger log = LogManager.getLogger(NATSStreaming.class);
    private StreamingConnection streamingConnection;
    private Options.Builder optionsBuilder;
    private String clusterId;
    private NATSSink natsSink;

    /* loaded from: input_file:io/siddhi/extension/io/nats/sink/nats/NATSStreaming$NATSConnectionLostHandler.class */
    class NATSConnectionLostHandler implements ConnectionLostHandler {
        NATSConnectionLostHandler() {
        }

        public void connectionLost(StreamingConnection streamingConnection, Exception exc) {
            NATSStreaming.log.error("Exception occurred in Siddhi App '" + NATSStreaming.this.siddhiAppName + "' when publishing messages to NATS endpoints " + Arrays.toString(NATSStreaming.this.natsUrls) + " . " + exc.getMessage(), exc);
            NATSStreaming.this.isConnected = new AtomicBoolean(false);
        }
    }

    public NATSStreaming(NATSSink nATSSink) {
        this.natsSink = nATSSink;
    }

    @Override // io.siddhi.extension.io.nats.sink.nats.NATSCore
    public void initiateClient(OptionHolder optionHolder, String str, String str2) {
        super.initiateClient(optionHolder, str, str2);
        if (optionHolder.isOptionExists(NATSConstants.CLUSTER_ID)) {
            this.clusterId = optionHolder.validateAndGetStaticValue(NATSConstants.CLUSTER_ID);
        } else if (optionHolder.isOptionExists(NATSConstants.STREAMING_CLUSTER_ID)) {
            this.clusterId = optionHolder.validateAndGetStaticValue(NATSConstants.STREAMING_CLUSTER_ID);
        }
        this.optionsBuilder = new Options.Builder().clientId(optionHolder.validateAndGetStaticValue(NATSConstants.CLIENT_ID, NATSUtils.createClientId(str, str2))).clusterId(this.clusterId).connectionLostHandler(new NATSConnectionLostHandler());
        if (optionHolder.isOptionExists(NATSConstants.ACK_WAIT)) {
            this.optionsBuilder.pubAckWait(Duration.ofSeconds(Long.parseLong(optionHolder.validateAndGetStaticValue(NATSConstants.ACK_WAIT))));
        }
    }

    @Override // io.siddhi.extension.io.nats.sink.nats.NATSCore
    public void createNATSClient() throws ConnectionUnavailableException {
        try {
            this.optionsBuilder.natsConn(Nats.connect(this.natsOptionBuilder.build()));
            this.streamingConnection = new StreamingConnectionFactory(this.optionsBuilder.build()).createConnection();
            this.isConnected.set(true);
        } catch (IOException e) {
            throw new ConnectionUnavailableException("Error in Siddhi App '" + this.siddhiAppName + "' while connecting to NATS server endpoint " + Arrays.toString(this.natsUrls) + " at destination: " + this.destination.getValue(), e);
        } catch (InterruptedException e2) {
            throw new ConnectionUnavailableException("Error in Siddhi App '" + this.siddhiAppName + "' while connecting to NATS server endpoint " + Arrays.toString(this.natsUrls) + " at destination: " + this.destination.getValue() + ". The calling thread is interrupted before the connection can be established.", e2);
        }
    }

    @Override // io.siddhi.extension.io.nats.sink.nats.NATSCore
    public void publishMessages(Object obj, byte[] bArr, DynamicOptions dynamicOptions) throws ConnectionUnavailableException {
        String value = this.destination.getValue(dynamicOptions);
        try {
            if (!this.isConnected.get()) {
                if (this.streamingConnection != null) {
                    this.streamingConnection.close();
                }
                createNATSClient();
            }
            this.streamingConnection.publish(value, bArr, new AsyncAckHandler(this.siddhiAppName, this.natsUrls, obj, this.natsSink, dynamicOptions));
        } catch (IOException e) {
            throw new SiddhiAppRuntimeException("Error sending message to destination:" + value, e);
        } catch (InterruptedException e2) {
            throw new SiddhiAppRuntimeException("Error sending message to destination:" + value + ".The calling thread is interrupted before the call completes.", e2);
        } catch (TimeoutException e3) {
            throw new SiddhiAppRuntimeException("Error sending message to destination:" + value + ".Timeout occured while trying to ack.", e3);
        }
    }

    @Override // io.siddhi.extension.io.nats.sink.nats.NATSCore
    public void disconnect() {
        if (this.streamingConnection != null) {
            try {
                this.streamingConnection.close();
            } catch (IOException | InterruptedException | TimeoutException e) {
                log.error("Error disconnecting the Stan receiver in Siddhi App '" + this.siddhiAppName + "' when publishing messages to NATS endpoint " + Arrays.toString(this.natsUrls) + " . " + e.getMessage(), e);
            }
        }
    }
}
