package org.wso2.mb.integration.common.clients;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.xml.xpath.XPathExpressionException;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.wso2.carbon.automation.engine.context.AutomationContext;
import org.wso2.mb.integration.common.clients.operations.mqtt.async.MQTTAsyncPublisherClient;
import org.wso2.mb.integration.common.clients.operations.mqtt.async.MQTTAsyncSubscriberClient;
import org.wso2.mb.integration.common.clients.operations.mqtt.blocking.MQTTBlockingPublisherClient;
import org.wso2.mb.integration.common.clients.operations.mqtt.blocking.MQTTBlockingSubscriberClient;

/* loaded from: input_file:org/wso2/mb/integration/common/clients/MQTTClientEngine.class */
public class MQTTClientEngine {
    private static final int MILLISECONDS_TO_A_SECOND = 1000;
    private ScheduledFuture tpsPublisherSchedule;
    private int previousReceivedMessageCount;
    private int previousSentMessageCount;
    private final List<AndesMQTTClient> publisherList = new ArrayList();
    private final List<AndesMQTTClient> subscriberList = new ArrayList();
    private final ExecutorService clientControlSubscriptionThreads = Executors.newFixedThreadPool(10);
    private final ExecutorService clientControlPublisherThreads = Executors.newFixedThreadPool(10);
    private final Log log = LogFactory.getLog(MQTTClientEngine.class);
    private final ScheduledExecutorService scheduleExecutor = Executors.newScheduledThreadPool(1);

    public MQTTClientEngine() {
        startTPSPublisher();
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.wso2.mb.integration.common.clients.MQTTClientEngine.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    MQTTClientEngine.this.shutdown();
                    MQTTClientEngine.this.log.info("All mqtt clients have been disconnected.");
                } catch (MqttException e) {
                    MQTTClientEngine.this.log.error("Error occurred invoking disconnect for " + getName(), e);
                }
            }
        });
    }

    public String generateClientID() {
        String random = RandomStringUtils.random(23, String.valueOf(System.currentTimeMillis()));
        this.log.info("ClientID generated : " + random);
        return random;
    }

    public void createSubscriberConnection(MQTTClientConnectionConfiguration mQTTClientConnectionConfiguration, String str, QualityOfService qualityOfService, boolean z, ClientMode clientMode) throws MqttException {
        if (ClientMode.ASYNC == clientMode) {
            MQTTAsyncSubscriberClient mQTTAsyncSubscriberClient = new MQTTAsyncSubscriberClient(mQTTClientConnectionConfiguration, generateClientID(), str, qualityOfService, z);
            this.subscriberList.add(mQTTAsyncSubscriberClient);
            this.clientControlSubscriptionThreads.execute(mQTTAsyncSubscriberClient);
        } else {
            if (ClientMode.BLOCKING != clientMode) {
                throw new MqttException(new Throwable("Unidentified clientMode : " + clientMode));
            }
            MQTTBlockingSubscriberClient mQTTBlockingSubscriberClient = new MQTTBlockingSubscriberClient(mQTTClientConnectionConfiguration, generateClientID(), str, qualityOfService, z);
            this.subscriberList.add(mQTTBlockingSubscriberClient);
            mQTTBlockingSubscriberClient.run();
        }
        waitForSubscribersToSubscribe();
    }

    public void createSubscriberConnection(MQTTClientConnectionConfiguration mQTTClientConnectionConfiguration, String str, QualityOfService qualityOfService, boolean z, ClientMode clientMode, String str2) throws MqttException {
        if (ClientMode.ASYNC == clientMode) {
            MQTTAsyncSubscriberClient mQTTAsyncSubscriberClient = new MQTTAsyncSubscriberClient(mQTTClientConnectionConfiguration, str2, str, qualityOfService, z);
            this.subscriberList.add(mQTTAsyncSubscriberClient);
            this.clientControlSubscriptionThreads.execute(mQTTAsyncSubscriberClient);
        } else {
            if (ClientMode.BLOCKING != clientMode) {
                throw new MqttException(new Throwable("Unidentified clientMode : " + clientMode));
            }
            MQTTBlockingSubscriberClient mQTTBlockingSubscriberClient = new MQTTBlockingSubscriberClient(mQTTClientConnectionConfiguration, str2, str, qualityOfService, z);
            this.subscriberList.add(mQTTBlockingSubscriberClient);
            mQTTBlockingSubscriberClient.run();
        }
        waitForSubscribersToSubscribe();
    }

    public void createSubscriberConnection(String str, QualityOfService qualityOfService, int i, boolean z, ClientMode clientMode, AutomationContext automationContext) throws MqttException, XPathExpressionException {
        MQTTClientConnectionConfiguration configurations = getConfigurations(automationContext);
        for (int i2 = 0; i2 < i; i2++) {
            createSubscriberConnection(configurations, str, qualityOfService, z, clientMode);
        }
    }

    public void createSubscriberConnection(String str, QualityOfService qualityOfService, int i, boolean z, ClientMode clientMode, MQTTClientConnectionConfiguration mQTTClientConnectionConfiguration) throws MqttException {
        for (int i2 = 0; i2 < i; i2++) {
            createSubscriberConnection(mQTTClientConnectionConfiguration, str, qualityOfService, z, clientMode);
        }
    }

    private void waitForSubscribersToSubscribe() {
        while (!isAllSubscribersSubscribed()) {
            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
                this.log.error("Error waiting until subscribers subscribe to topics.", e);
            }
            this.log.info("Waiting for subscribers to create connection");
        }
    }

    private boolean isAllSubscribersSubscribed() {
        boolean z = true;
        Iterator<AndesMQTTClient> it = this.subscriberList.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!it.next().isSubscribed()) {
                z = false;
                break;
            }
        }
        return z;
    }

    public void createPublisherConnection(MQTTClientConnectionConfiguration mQTTClientConnectionConfiguration, String str, QualityOfService qualityOfService, byte[] bArr, int i, ClientMode clientMode) throws MqttException {
        if (ClientMode.ASYNC == clientMode) {
            MQTTAsyncPublisherClient mQTTAsyncPublisherClient = new MQTTAsyncPublisherClient(mQTTClientConnectionConfiguration, generateClientID(), str, qualityOfService, bArr, i);
            this.publisherList.add(mQTTAsyncPublisherClient);
            this.clientControlPublisherThreads.execute(mQTTAsyncPublisherClient);
        } else {
            if (ClientMode.BLOCKING != clientMode) {
                throw new MqttException(new Throwable("Unidentified ClientMode : " + clientMode));
            }
            MQTTBlockingPublisherClient mQTTBlockingPublisherClient = new MQTTBlockingPublisherClient(mQTTClientConnectionConfiguration, generateClientID(), str, qualityOfService, bArr, i);
            this.publisherList.add(mQTTBlockingPublisherClient);
            mQTTBlockingPublisherClient.run();
        }
    }

    public void createPublisherConnection(String str, QualityOfService qualityOfService, byte[] bArr, int i, int i2, ClientMode clientMode, AutomationContext automationContext) throws MqttException, XPathExpressionException {
        createPublisherConnection(str, qualityOfService, bArr, i, i2, clientMode, getConfigurations(automationContext));
    }

    public void createPublisherConnection(String str, QualityOfService qualityOfService, byte[] bArr, int i, int i2, ClientMode clientMode, MQTTClientConnectionConfiguration mQTTClientConnectionConfiguration) throws MqttException {
        for (int i3 = 0; i3 < i; i3++) {
            createPublisherConnection(mQTTClientConnectionConfiguration, str, qualityOfService, bArr, i2, clientMode);
        }
    }

    private MQTTClientConnectionConfiguration getDefaultConfigurations() {
        MQTTClientConnectionConfiguration mQTTClientConnectionConfiguration = new MQTTClientConnectionConfiguration();
        mQTTClientConnectionConfiguration.setBrokerHost(MQTTConstants.BROKER_HOST);
        mQTTClientConnectionConfiguration.setBrokerProtocol(MQTTConstants.BROKER_PROTOCOL);
        mQTTClientConnectionConfiguration.setBrokerPort(MQTTConstants.BROKER_PORT);
        mQTTClientConnectionConfiguration.setBrokerPassword("admin");
        mQTTClientConnectionConfiguration.setBrokerUserName("admin");
        mQTTClientConnectionConfiguration.setCleanSession(true);
        return mQTTClientConnectionConfiguration;
    }

    public MQTTClientConnectionConfiguration getConfigurations(AutomationContext automationContext) throws XPathExpressionException {
        MQTTClientConnectionConfiguration defaultConfigurations = getDefaultConfigurations();
        String str = (String) automationContext.getInstance().getHosts().get("default");
        if (!str.isEmpty()) {
            defaultConfigurations.setBrokerHost(str);
        }
        if (!((String) automationContext.getInstance().getPorts().get("mqtt")).isEmpty()) {
            defaultConfigurations.setBrokerPort((String) automationContext.getInstance().getPorts().get("mqtt"));
        }
        return defaultConfigurations;
    }

    public List<MqttMessage> getReceivedMessages() {
        ArrayList arrayList = new ArrayList();
        Iterator<AndesMQTTClient> it = this.subscriberList.iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().getReceivedMessages());
        }
        return arrayList;
    }

    public int getReceivedMessageCount() {
        int i = 0;
        Iterator<AndesMQTTClient> it = this.subscriberList.iterator();
        while (it.hasNext()) {
            i += it.next().getReceivedMessageCount();
        }
        return i;
    }

    public int getSentMessageCount() {
        int i = 0;
        Iterator<AndesMQTTClient> it = this.publisherList.iterator();
        while (it.hasNext()) {
            i += it.next().getSentMessageCount();
        }
        return i;
    }

    public List<AndesMQTTClient> getSubscriberList() {
        return this.subscriberList;
    }

    public List<AndesMQTTClient> getPublisherList() {
        return this.publisherList;
    }

    public void waitUntilAllMessageReceivedAndShutdownClients() throws MqttException {
        waitUntilAllMessageReceived();
        shutdown();
    }

    public void waitUntilAllMessageReceived() {
        int i = 0;
        int i2 = -1;
        while (true) {
            int i3 = i2;
            if (i3 == i) {
                return;
            }
            try {
                TimeUnit.SECONDS.sleep(10L);
            } catch (InterruptedException e) {
                this.log.error("Error waiting for receiving messages.", e);
            }
            i = i3;
            i2 = getReceivedMessageCount();
        }
    }

    public void waitUntilExpectedNumberOfMessagesReceived(int i, long j) throws MqttException {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (System.currentTimeMillis() <= currentTimeMillis) {
            try {
                TimeUnit.MILLISECONDS.sleep(2000L);
            } catch (InterruptedException e) {
                this.log.error("Error waiting for receiving messages.", e);
            }
            if (i <= getReceivedMessageCount()) {
                this.log.info("Expected message count received by subscriber.");
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public double calculateTPS(long j, int i) {
        return i / (j / 1000.0d);
    }

    private void startTPSPublisher() {
        this.tpsPublisherSchedule = this.scheduleExecutor.scheduleAtFixedRate(new Runnable() { // from class: org.wso2.mb.integration.common.clients.MQTTClientEngine.2
            @Override // java.lang.Runnable
            public void run() {
                int receivedMessageCount = MQTTClientEngine.this.getReceivedMessageCount();
                int sentMessageCount = MQTTClientEngine.this.getSentMessageCount();
                if (receivedMessageCount != MQTTClientEngine.this.previousReceivedMessageCount) {
                    MQTTClientEngine.this.log.info("Message Receiving TPS for the last 5 seconds : " + MQTTClientEngine.this.calculateTPS(5000L, receivedMessageCount - MQTTClientEngine.this.previousReceivedMessageCount));
                    MQTTClientEngine.this.previousReceivedMessageCount = receivedMessageCount;
                }
                if (sentMessageCount != MQTTClientEngine.this.previousSentMessageCount) {
                    MQTTClientEngine.this.log.info("Message Sending TPS for the last 5 seconds : " + MQTTClientEngine.this.calculateTPS(5000L, sentMessageCount - MQTTClientEngine.this.previousSentMessageCount));
                    MQTTClientEngine.this.previousSentMessageCount = sentMessageCount;
                }
            }
        }, 0L, 5L, TimeUnit.SECONDS);
    }

    public void shutdown() throws MqttException {
        Iterator<AndesMQTTClient> it = this.subscriberList.iterator();
        while (it.hasNext()) {
            it.next().disconnect();
        }
        Iterator<AndesMQTTClient> it2 = this.publisherList.iterator();
        while (it2.hasNext()) {
            it2.next().disconnect();
        }
        this.tpsPublisherSchedule.cancel(true);
        this.scheduleExecutor.shutdown();
    }
}
