package org.wso2.carbon.event.processor.core.internal.storm;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.wso2.carbon.databridge.commons.thrift.utils.HostAddressFinder;
import org.wso2.carbon.event.processor.common.storm.manager.service.StormManagerService;
import org.wso2.carbon.event.processor.common.util.ThroughputProbe;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfiguration;
import org.wso2.carbon.event.processor.core.internal.listener.SiddhiOutputStreamListener;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConstants;
import org.wso2.carbon.event.processor.manager.commons.transport.server.ConnectionCallback;
import org.wso2.carbon.event.processor.manager.commons.transport.server.StreamCallback;
import org.wso2.carbon.event.processor.manager.commons.transport.server.TCPEventServer;
import org.wso2.carbon.event.processor.manager.commons.transport.server.TCPEventServerConfig;
import org.wso2.carbon.event.processor.manager.commons.utils.HostAndPort;
import org.wso2.carbon.event.processor.manager.commons.utils.Utils;
import org.wso2.carbon.event.processor.manager.core.config.DistributedConfiguration;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/SiddhiStormOutputEventListener.class */
public class SiddhiStormOutputEventListener implements StreamCallback {
    private static final Logger log = Logger.getLogger(SiddhiStormOutputEventListener.class);
    private ExecutionPlanConfiguration executionPlanConfiguration;
    private int listeningPort;
    private int tenantId;
    private final DistributedConfiguration stormDeploymentConfig;
    private String thisHostIp;
    private TCPEventServer tcpEventServer;
    private int heartbeatInterval;
    private ThroughputProbe inputThroughputProbe;
    private final ConnectionCallback connectionCallback;
    private HashMap<String, SiddhiOutputStreamListener> streamNameToOutputStreamListenerMap = new HashMap<>();
    private String logPrefix = "";
    private ExecutorService executorService = Executors.newSingleThreadExecutor();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/SiddhiStormOutputEventListener$Registrar.class */
    public class Registrar implements Runnable {
        private String managerHost;
        private int managerPort;

        Registrar() {
        }

        @Override // java.lang.Runnable
        public void run() {
            SiddhiStormOutputEventListener.log.info(SiddhiStormOutputEventListener.this.logPrefix + "Registering CEP publisher for " + SiddhiStormOutputEventListener.this.thisHostIp + ":" + SiddhiStormOutputEventListener.this.listeningPort);
            while (true) {
                if (registerCEPPublisherWithStormMangerService()) {
                    while (true) {
                        TTransport tTransport = null;
                        try {
                            try {
                                tTransport = new TSocket(this.managerHost, this.managerPort);
                                TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
                                tTransport.open();
                                new StormManagerService.Client(tBinaryProtocol).registerCEPPublisher(SiddhiStormOutputEventListener.this.tenantId, SiddhiStormOutputEventListener.this.executionPlanConfiguration.getName(), SiddhiStormOutputEventListener.this.thisHostIp, SiddhiStormOutputEventListener.this.listeningPort);
                                if (SiddhiStormOutputEventListener.log.isDebugEnabled()) {
                                    SiddhiStormOutputEventListener.log.debug(SiddhiStormOutputEventListener.this.logPrefix + "Successfully registered CEP publisher for " + SiddhiStormOutputEventListener.this.thisHostIp + ":" + SiddhiStormOutputEventListener.this.listeningPort);
                                }
                                try {
                                    Thread.sleep(SiddhiStormOutputEventListener.this.heartbeatInterval);
                                } catch (InterruptedException e) {
                                    Thread.currentThread().interrupt();
                                }
                                if (tTransport != null) {
                                    tTransport.close();
                                }
                            } catch (Throwable th) {
                                if (tTransport != null) {
                                    tTransport.close();
                                }
                                throw th;
                            }
                        } catch (Exception e2) {
                            SiddhiStormOutputEventListener.log.error(SiddhiStormOutputEventListener.this.logPrefix + "Error in registering CEP publisher for " + SiddhiStormOutputEventListener.this.thisHostIp + ":" + SiddhiStormOutputEventListener.this.listeningPort + " with manager " + this.managerHost + ":" + this.managerPort + ". Trying next manager after " + SiddhiStormOutputEventListener.this.heartbeatInterval + "ms", e2);
                            if (tTransport != null) {
                                tTransport.close();
                            }
                        }
                    }
                } else {
                    SiddhiStormOutputEventListener.log.error(SiddhiStormOutputEventListener.this.logPrefix + "Error registering CEP publisher with current manager. Retrying after " + SiddhiStormOutputEventListener.this.heartbeatInterval + "ms");
                }
                try {
                    Thread.sleep(SiddhiStormOutputEventListener.this.heartbeatInterval);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        private boolean registerCEPPublisherWithStormMangerService() {
            TTransport tTransport = null;
            for (HostAndPort hostAndPort : SiddhiStormOutputEventListener.this.stormDeploymentConfig.getManagers()) {
                try {
                    try {
                        tTransport = new TSocket(hostAndPort.getHostName(), hostAndPort.getPort());
                        TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
                        tTransport.open();
                        new StormManagerService.Client(tBinaryProtocol).registerCEPPublisher(SiddhiStormOutputEventListener.this.tenantId, SiddhiStormOutputEventListener.this.executionPlanConfiguration.getName(), SiddhiStormOutputEventListener.this.thisHostIp, SiddhiStormOutputEventListener.this.listeningPort);
                        SiddhiStormOutputEventListener.log.info(SiddhiStormOutputEventListener.this.logPrefix + "Successfully registered CEP publisher for " + SiddhiStormOutputEventListener.this.thisHostIp + ":" + SiddhiStormOutputEventListener.this.listeningPort + "  with manager service at" + hostAndPort.getHostName() + ":" + hostAndPort.getPort());
                        this.managerHost = hostAndPort.getHostName();
                        this.managerPort = hostAndPort.getPort();
                        if (tTransport != null) {
                            tTransport.close();
                        }
                        return true;
                    } catch (Exception e) {
                        SiddhiStormOutputEventListener.log.error(SiddhiStormOutputEventListener.this.logPrefix + "Error in registering CEP publisher for " + SiddhiStormOutputEventListener.this.thisHostIp + ":" + SiddhiStormOutputEventListener.this.listeningPort + " with manager " + hostAndPort.getHostName() + ":" + hostAndPort.getPort() + EventProcessorConstants.METRIC_DELIMITER + " Trying next manager", e);
                        if (tTransport != null) {
                            tTransport.close();
                        }
                    }
                } catch (Throwable th) {
                    if (tTransport != null) {
                        tTransport.close();
                    }
                    throw th;
                }
            }
            return false;
        }
    }

    public SiddhiStormOutputEventListener(ExecutionPlanConfiguration executionPlanConfiguration, int i, DistributedConfiguration distributedConfiguration, ConnectionCallback connectionCallback) {
        this.executionPlanConfiguration = executionPlanConfiguration;
        this.tenantId = i;
        this.stormDeploymentConfig = distributedConfiguration;
        this.heartbeatInterval = distributedConfiguration.getManagementHeartbeatInterval();
        this.connectionCallback = connectionCallback;
        init();
    }

    private void init() {
        this.logPrefix = "[" + this.tenantId + ":" + this.executionPlanConfiguration.getName() + ":CEPPublisher] ";
        log.info(this.logPrefix + "Initializing storm output event listener");
        this.inputThroughputProbe = new ThroughputProbe(this.logPrefix + "-Receive", 10);
        this.inputThroughputProbe.startSampling();
        try {
            this.thisHostIp = HostAddressFinder.findAddress("localhost");
            this.listeningPort = findPort(this.thisHostIp);
            this.tcpEventServer = new TCPEventServer(new TCPEventServerConfig(this.thisHostIp, this.listeningPort), this, this.connectionCallback);
            this.tcpEventServer.start();
            this.executorService.execute(new Registrar());
        } catch (Exception e) {
            log.error(this.logPrefix + "Failed to start event listener", e);
        }
    }

    public void registerOutputStreamListener(StreamDefinition streamDefinition, SiddhiOutputStreamListener siddhiOutputStreamListener) {
        log.info(this.logPrefix + "Registering output stream listener for Siddhi stream : " + streamDefinition.getId());
        this.streamNameToOutputStreamListenerMap.put(streamDefinition.getId(), siddhiOutputStreamListener);
        this.tcpEventServer.addStreamDefinition(streamDefinition);
    }

    public void receive(String str, long j, Object[] objArr, Map<String, String> map) {
        SiddhiOutputStreamListener siddhiOutputStreamListener = this.streamNameToOutputStreamListenerMap.get(str);
        if (siddhiOutputStreamListener == null) {
            log.warn("Cannot find output event listener for stream " + str + " in execution plan " + this.executionPlanConfiguration.getName() + " of tenant " + this.tenantId + ". Discarding Event:" + str + ":" + Arrays.deepToString(objArr) + EventProcessorConstants.ANNOTATION_TOKEN_AT + j);
        } else {
            siddhiOutputStreamListener.sendEvent(new Event(j, objArr));
            this.inputThroughputProbe.update();
        }
    }

    private int findPort(String str) throws Exception {
        for (int transportMinPort = this.stormDeploymentConfig.getTransportMinPort(); transportMinPort <= this.stormDeploymentConfig.getTransportMaxPort(); transportMinPort++) {
            if (!Utils.isPortUsed(transportMinPort, str)) {
                return transportMinPort;
            }
        }
        throw new Exception("Cannot find free port in range " + this.stormDeploymentConfig.getTransportMinPort() + "~" + this.stormDeploymentConfig.getTransportMaxPort());
    }

    public void shutdown() {
        this.executorService.shutdown();
        this.tcpEventServer.shutdown();
    }
}
