package org.wso2.carbon.inbound.endpoint.protocol.nats;

import io.nats.streaming.Message;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.StreamingConnectionFactory;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseException;
import org.wso2.carbon.inbound.endpoint.protocol.nats.management.NatsEndpointManager;

/* loaded from: input_file:org/wso2/carbon/inbound/endpoint/protocol/nats/StreamingListener.class */
public class StreamingListener implements NatsMessageListener {
    private static final Log log = LogFactory.getLog(StreamingListener.class.getName());
    private String subject;
    private NatsInjectHandler injectHandler;
    private Properties natsProperties;
    private StreamingConnection connection;
    private Subscription subscription;

    public StreamingListener(String str, NatsInjectHandler natsInjectHandler, Properties properties) {
        this.subject = str;
        this.injectHandler = natsInjectHandler;
        this.natsProperties = properties;
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.nats.NatsMessageListener
    public boolean createConnection() throws IOException, InterruptedException {
        if (this.connection != null) {
            return true;
        }
        String property = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_URL);
        String property2 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_CLIENT_ID);
        String property3 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_CLUSTER_ID);
        String property4 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_CONNECT_WAIT);
        String property5 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_DISCOVER_PREFIX);
        String property6 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_MAX_PINGS_OUT);
        String property7 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_PING_INTERVAL);
        String property8 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_TRACE_CONNECTION);
        Options.Builder connectionLostHandler = new Options.Builder().natsUrl(StringUtils.isEmpty(property) ? NatsConstants.DEFAULT_NATS_STREAMING_URL : property).clientId(property2).clusterId(StringUtils.isEmpty(property3) ? NatsConstants.DEFAULT_NATS_STREAMING_CLUSTER_ID : property3).connectionLostHandler((streamingConnection, exc) -> {
            NatsMessageConsumer messageConsumer = NatsEndpointManager.getInstance().getMessageConsumer();
            try {
                if (this.connection != null) {
                    this.connection.close();
                }
                this.connection = null;
                this.subscription = null;
                messageConsumer.initializeConsumer();
            } catch (IOException | InterruptedException e) {
                log.error("An error occurred while connecting to NATS server, consuming messages or while closing the connection. " + e);
                messageConsumer.closeConnection();
            } catch (SynapseException e2) {
                log.error("Error while retrieving or injecting NATS message. " + exc.getMessage(), e2);
            } catch (Exception e3) {
                log.error("Error while retrieving or injecting NATS message or closing conection. " + exc.getMessage(), e3);
                messageConsumer.closeConnection();
            }
        });
        if (Boolean.parseBoolean(this.natsProperties.getProperty(NatsConstants.USE_CORE_NATS_CONNECTION))) {
            connectionLostHandler.natsConn(new CoreListener(this.subject, this.injectHandler, this.natsProperties).getNatsConnection());
        }
        if (StringUtils.isNotEmpty(property4)) {
            connectionLostHandler.connectWait(Duration.ofSeconds(Integer.parseInt(property4)));
        }
        if (StringUtils.isNotEmpty(property5)) {
            connectionLostHandler.discoverPrefix(property5);
        }
        if (StringUtils.isNotEmpty(property6)) {
            connectionLostHandler.maxPingsOut(Integer.parseInt(property6));
        }
        if (StringUtils.isNotEmpty(property7)) {
            connectionLostHandler.pingInterval(Duration.ofSeconds(Integer.parseInt(property7)));
        }
        if (Boolean.parseBoolean(property8)) {
            connectionLostHandler.traceConnection();
        }
        this.connection = new StreamingConnectionFactory(connectionLostHandler.build()).createConnection();
        return true;
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.nats.NatsMessageListener
    public void initializeConsumer(String str) throws InterruptedException, IOException, TimeoutException {
        if (createConnection()) {
            SubscriptionOptions.Builder builder = new SubscriptionOptions.Builder();
            String property = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_DURABLE_NAME);
            String property2 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_QUEUE_GROUP);
            boolean parseBoolean = Boolean.parseBoolean(this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_MANUAL_ACK));
            String property3 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_ACK_WAIT);
            String property4 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_MAX_IN_FLIGHT);
            String property5 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_SUBSCRIPTION_TIMEOUT);
            String property6 = this.natsProperties.getProperty(NatsConstants.NATS_STREAMING_DISPATCHER);
            if (StringUtils.isNotEmpty(property)) {
                builder.durableName(property);
            }
            if (parseBoolean) {
                builder.manualAcks();
            }
            if (StringUtils.isNotEmpty(property3)) {
                builder.ackWait(Duration.ofSeconds(Integer.parseInt(property3)));
            }
            if (StringUtils.isNotEmpty(property4)) {
                builder.maxInFlight(Integer.parseInt(property4));
            }
            if (StringUtils.isNotEmpty(property5)) {
                builder.subscriptionTimeout(Duration.ofSeconds(Integer.parseInt(property5)));
            }
            if (StringUtils.isNotEmpty(property6)) {
                builder.dispatcher(property6);
            }
            this.subscription = this.connection.subscribe(this.subject, property2, message -> {
                String str2 = new String(message.getData());
                if (log.isDebugEnabled()) {
                    log.debug("Message Received to NATS Inbound EP: " + str2);
                }
                if (this.injectHandler.invoke(str2.getBytes(), str, null, null)) {
                    acknowledge(parseBoolean, message);
                }
            }, builder.build());
        }
    }

    private void acknowledge(boolean z, Message message) {
        if (z) {
            try {
                message.ack();
            } catch (IOException e) {
                log.error("An error occurred while sending manual ack. Message might get redelivered. Sequence number: " + message.getSequence(), e);
            }
        }
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.nats.NatsMessageListener
    public void closeConnection() {
        try {
            if (this.subscription != null) {
                this.subscription.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
        } catch (IOException | InterruptedException | TimeoutException e) {
            log.error("An error occurred while closing the connection. ", e);
        }
        this.connection = null;
        this.subscription = null;
    }
}
