package org.wso2.carbon.event.processor.common.storm.component;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.log4j.Logger;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.wso2.carbon.event.processor.common.storm.event.Event;
import org.wso2.carbon.event.processor.common.storm.manager.service.StormManagerService;
import org.wso2.carbon.event.processor.common.util.ThroughputProbe;
import org.wso2.carbon.event.processor.manager.commons.transport.server.ConnectionCallback;
import org.wso2.carbon.event.processor.manager.commons.transport.server.StreamCallback;
import org.wso2.carbon.event.processor.manager.commons.transport.server.TCPEventServer;
import org.wso2.carbon.event.processor.manager.commons.transport.server.TCPEventServerConfig;
import org.wso2.carbon.event.processor.manager.commons.utils.HostAndPort;
import org.wso2.carbon.event.processor.manager.commons.utils.Utils;
import org.wso2.carbon.event.processor.manager.core.config.DistributedConfiguration;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.compiler.SiddhiCompiler;

/* loaded from: input_file:org/wso2/carbon/event/processor/common/storm/component/EventReceiverSpout.class */
public class EventReceiverSpout extends BaseRichSpout implements StreamCallback {
    private static transient Logger log = Logger.getLogger(EventReceiverSpout.class);
    private int listeningPort;
    private String thisHostIp;
    private DistributedConfiguration stormDeploymentConfig;
    private List<StreamDefinition> incomingStreamDefinitions;
    private TCPEventServer tcpEventServer;
    private List<String> incomingStreamIDs = new ArrayList();
    private transient ConcurrentLinkedQueue<Event> storedEvents = null;
    private SpoutOutputCollector spoutOutputCollector = null;
    private String executionPlanName;
    private int tenantId;
    private String logPrefix;
    private int heartbeatInterval;
    private transient ThroughputProbe inputThroughputProbe;
    private transient ThroughputProbe outputThroughputProbe;

    /* loaded from: input_file:org/wso2/carbon/event/processor/common/storm/component/EventReceiverSpout$Registrar.class */
    class Registrar implements Runnable {
        private String managerHost;
        private int managerPort;

        Registrar() {
        }

        @Override // java.lang.Runnable
        public void run() {
            EventReceiverSpout.log.info(EventReceiverSpout.this.logPrefix + "Registering Event Receiver Spout for " + EventReceiverSpout.this.thisHostIp + ":" + EventReceiverSpout.this.listeningPort);
            while (true) {
                if (registerStormReceiverWithStormMangerService()) {
                    while (true) {
                        TTransport tTransport = null;
                        try {
                            try {
                                tTransport = new TSocket(this.managerHost, this.managerPort);
                                TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
                                tTransport.open();
                                new StormManagerService.Client(tBinaryProtocol).registerStormReceiver(EventReceiverSpout.this.tenantId, EventReceiverSpout.this.executionPlanName, EventReceiverSpout.this.thisHostIp, EventReceiverSpout.this.listeningPort);
                                if (EventReceiverSpout.log.isDebugEnabled()) {
                                    EventReceiverSpout.log.debug(EventReceiverSpout.this.logPrefix + "Successfully registered Event Receiver Spout for " + EventReceiverSpout.this.thisHostIp + ":" + EventReceiverSpout.this.listeningPort);
                                }
                                try {
                                    Thread.sleep(EventReceiverSpout.this.heartbeatInterval);
                                    if (tTransport != null) {
                                        tTransport.close();
                                    }
                                } catch (InterruptedException e) {
                                    if (tTransport != null) {
                                        tTransport.close();
                                    }
                                }
                            } catch (Exception e2) {
                                EventReceiverSpout.log.error(EventReceiverSpout.this.logPrefix + "Error in registering Event Receiver Spout for " + EventReceiverSpout.this.thisHostIp + ":" + EventReceiverSpout.this.listeningPort + " with manager " + this.managerHost + ":" + this.managerPort + ". Trying next manager after " + EventReceiverSpout.this.heartbeatInterval + "ms", e2);
                                if (tTransport != null) {
                                    tTransport.close();
                                }
                            }
                        } catch (Throwable th) {
                            if (tTransport != null) {
                                tTransport.close();
                            }
                            throw th;
                        }
                    }
                } else {
                    EventReceiverSpout.log.error(EventReceiverSpout.this.logPrefix + "Error registering Event Receiver Spout with given set of manager nodes. Retrying after " + EventReceiverSpout.this.heartbeatInterval + "ms");
                }
                try {
                    Thread.sleep(EventReceiverSpout.this.heartbeatInterval);
                } catch (InterruptedException e3) {
                }
            }
        }

        private boolean registerStormReceiverWithStormMangerService() {
            TTransport tTransport = null;
            for (HostAndPort hostAndPort : EventReceiverSpout.this.stormDeploymentConfig.getManagers()) {
                try {
                    tTransport = new TSocket(hostAndPort.getHostName(), hostAndPort.getPort());
                    TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
                    tTransport.open();
                    new StormManagerService.Client(tBinaryProtocol).registerStormReceiver(EventReceiverSpout.this.tenantId, EventReceiverSpout.this.executionPlanName, EventReceiverSpout.this.thisHostIp, EventReceiverSpout.this.listeningPort);
                    EventReceiverSpout.log.info(EventReceiverSpout.this.logPrefix + "Successfully registered Event Receiver Spout for " + EventReceiverSpout.this.thisHostIp + ":" + EventReceiverSpout.this.listeningPort + " with manager service at " + hostAndPort.getHostName() + ":" + hostAndPort.getPort());
                    this.managerHost = hostAndPort.getHostName();
                    this.managerPort = hostAndPort.getPort();
                    if (tTransport != null) {
                        tTransport.close();
                    }
                    return true;
                } catch (Exception e) {
                    try {
                        EventReceiverSpout.log.error(EventReceiverSpout.this.logPrefix + "Error in registering Event Receiver Spout for " + EventReceiverSpout.this.thisHostIp + ":" + EventReceiverSpout.this.listeningPort + " with manager " + hostAndPort.getHostName() + ":" + hostAndPort.getPort() + ", Trying next manager.", e);
                        if (tTransport != null) {
                            tTransport.close();
                        }
                    } catch (Throwable th) {
                        if (tTransport != null) {
                            tTransport.close();
                        }
                        throw th;
                    }
                }
            }
            return false;
        }
    }

    public EventReceiverSpout(DistributedConfiguration distributedConfiguration, List<String> list, String str, int i, int i2) {
        this.tenantId = -1234;
        this.incomingStreamDefinitions = new ArrayList(list.size());
        this.stormDeploymentConfig = distributedConfiguration;
        this.executionPlanName = str;
        this.tenantId = i;
        this.heartbeatInterval = i2;
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            this.incomingStreamDefinitions.add(SiddhiCompiler.parseStreamDefinition(it.next()));
        }
        this.logPrefix = "[" + i + ":" + str + ":EventReceiverSpout] ";
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        for (StreamDefinition streamDefinition : this.incomingStreamDefinitions) {
            ArrayList arrayList = new ArrayList(Arrays.asList(streamDefinition.getAttributeNameArray()));
            arrayList.add(0, "_timestamp");
            outputFieldsDeclarer.declareStream(streamDefinition.getId(), new Fields(arrayList));
            this.incomingStreamIDs.add(streamDefinition.getId());
            log.info(this.logPrefix + "Declaring output fields for stream : " + streamDefinition.getId());
        }
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
        this.storedEvents = new ConcurrentLinkedQueue<>();
        this.inputThroughputProbe = new ThroughputProbe(this.logPrefix + "-IN", 10);
        this.outputThroughputProbe = new ThroughputProbe(this.logPrefix + " -OUT", 10);
        this.inputThroughputProbe.startSampling();
        this.outputThroughputProbe.startSampling();
        try {
            this.listeningPort = findPort();
            this.thisHostIp = Utils.findAddress("localhost");
            TCPEventServerConfig tCPEventServerConfig = new TCPEventServerConfig(this.thisHostIp, this.listeningPort);
            tCPEventServerConfig.setNumberOfThreads(this.stormDeploymentConfig.getTransportReceiverThreads());
            this.tcpEventServer = new TCPEventServer(tCPEventServerConfig, this, (ConnectionCallback) null);
            Iterator<StreamDefinition> it = this.incomingStreamDefinitions.iterator();
            while (it.hasNext()) {
                this.tcpEventServer.addStreamDefinition(it.next());
            }
            this.tcpEventServer.start();
            log.info(this.logPrefix + "EventReceiverSpout starting to listen for events on port " + this.listeningPort);
            new Thread(new Registrar()).start();
        } catch (Throwable th) {
            log.error(this.logPrefix + "Error starting event listener for spout: " + th.getMessage(), th);
        }
    }

    public void nextTuple() {
        Event poll = this.storedEvents.poll();
        if (poll != null) {
            String streamId = poll.getStreamId();
            if (!this.incomingStreamIDs.contains(streamId)) {
                log.warn(this.logPrefix + "Event received for unknown stream : " + streamId);
                return;
            }
            Object[] copyOf = Arrays.copyOf(poll.getData(), poll.getData().length + 1);
            copyOf[poll.getData().length] = Long.valueOf(poll.getTimestamp());
            this.spoutOutputCollector.emit(streamId, Arrays.asList(copyOf));
            if (log.isDebugEnabled()) {
                log.debug(this.logPrefix + "Emitted Event: " + streamId + ":" + Arrays.deepToString(copyOf) + "@" + poll.getTimestamp());
            }
            this.outputThroughputProbe.update();
        }
    }

    private int findPort() throws Exception {
        for (int transportMinPort = this.stormDeploymentConfig.getTransportMinPort(); transportMinPort <= this.stormDeploymentConfig.getTransportMaxPort(); transportMinPort++) {
            if (!Utils.isPortUsed(transportMinPort)) {
                return transportMinPort;
            }
        }
        throw new Exception("Cannot find free port in range " + this.stormDeploymentConfig.getTransportMinPort() + "~" + this.stormDeploymentConfig.getTransportMaxPort());
    }

    public void receive(String str, long j, Object[] objArr) {
        if (log.isDebugEnabled()) {
            log.debug(this.logPrefix + "Received Event: " + str + ":" + Arrays.deepToString(objArr) + "@" + j);
        }
        this.storedEvents.add(new Event(j, objArr, str));
        this.inputThroughputProbe.update();
    }
}
