package org.wso2.carbon.event.processor.common.storm.component;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.log4j.Logger;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.wso2.carbon.event.processor.common.storm.config.StormDeploymentConfig;
import org.wso2.carbon.event.processor.common.storm.manager.service.StormManagerService;
import org.wso2.carbon.event.processor.common.transport.server.StreamCallback;
import org.wso2.carbon.event.processor.common.transport.server.TCPEventServer;
import org.wso2.carbon.event.processor.common.transport.server.TCPEventServerConfig;
import org.wso2.carbon.event.processor.common.util.Utils;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.event.in.InEvent;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/event/processor/common/storm/component/EventReceiverSpout.class */
public class EventReceiverSpout extends BaseRichSpout implements StreamCallback {
    private static transient Logger log = Logger.getLogger(EventReceiverSpout.class);
    private int listeningPort;
    private String thisHostIp;
    private StormDeploymentConfig stormDeploymentConfig;
    private List<StreamDefinition> incomingStreamDefinitions;
    private TCPEventServer tcpEventServer;
    private List<String> incomingStreamIDs = new ArrayList();
    private transient ConcurrentLinkedQueue<Event> storedEvents = null;
    private SpoutOutputCollector spoutOutputCollector = null;
    private String executionPlanName;
    private int tenantId;
    private String logPrefix;
    private int eventCount;
    private long batchStartTime;

    /* loaded from: input_file:org/wso2/carbon/event/processor/common/storm/component/EventReceiverSpout$Registrar.class */
    class Registrar implements Runnable {
        Registrar() {
        }

        @Override // java.lang.Runnable
        public void run() {
            EventReceiverSpout.log.info(EventReceiverSpout.this.logPrefix + "Registering Storm receiver with " + EventReceiverSpout.this.thisHostIp + ":" + EventReceiverSpout.this.listeningPort);
            if (registerStormReceiverWithStormMangerService()) {
                return;
            }
            EventReceiverSpout.log.info(EventReceiverSpout.this.logPrefix + "Retry registering Storm receiver in 30 sec");
            try {
                Thread.sleep(30000L);
            } catch (InterruptedException e) {
            }
        }

        private boolean registerStormReceiverWithStormMangerService() {
            TTransport tTransport = null;
            try {
                try {
                    tTransport = new TSocket(EventReceiverSpout.this.stormDeploymentConfig.getManagers().get(0).getHostName(), EventReceiverSpout.this.stormDeploymentConfig.getManagers().get(0).getPort());
                    TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
                    tTransport.open();
                    new StormManagerService.Client(tBinaryProtocol).registerStormReceiver(EventReceiverSpout.this.tenantId, EventReceiverSpout.this.executionPlanName, EventReceiverSpout.this.thisHostIp, EventReceiverSpout.this.listeningPort);
                    EventReceiverSpout.log.info(EventReceiverSpout.this.logPrefix + "Successfully registering Storm receiver with " + EventReceiverSpout.this.thisHostIp + ":" + EventReceiverSpout.this.listeningPort);
                    if (tTransport != null) {
                        tTransport.close();
                    }
                    return true;
                } catch (Exception e) {
                    EventReceiverSpout.log.error(EventReceiverSpout.this.logPrefix + "Error in registering Storm receiver", e);
                    if (tTransport != null) {
                        tTransport.close();
                    }
                    return false;
                }
            } catch (Throwable th) {
                if (tTransport != null) {
                    tTransport.close();
                }
                throw th;
            }
        }
    }

    public EventReceiverSpout(StormDeploymentConfig stormDeploymentConfig, List<StreamDefinition> list, String str, int i) {
        this.tenantId = -1234;
        this.stormDeploymentConfig = stormDeploymentConfig;
        this.incomingStreamDefinitions = list;
        this.executionPlanName = str;
        this.tenantId = i;
        this.logPrefix = "{" + str + ":" + i + "} - ";
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        for (StreamDefinition streamDefinition : this.incomingStreamDefinitions) {
            outputFieldsDeclarer.declareStream(streamDefinition.getStreamId(), new Fields(streamDefinition.getAttributeNameArray()));
            this.incomingStreamIDs.add(streamDefinition.getStreamId());
            log.info(this.logPrefix + "Declaring output fields for stream - " + streamDefinition.getStreamId());
        }
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
        this.storedEvents = new ConcurrentLinkedQueue<>();
        this.eventCount = 0;
        this.batchStartTime = System.currentTimeMillis();
        try {
            this.listeningPort = findPort();
            this.thisHostIp = Utils.findAddress("localhost");
            this.tcpEventServer = new TCPEventServer(new TCPEventServerConfig(this.listeningPort), this);
            Iterator<StreamDefinition> it = this.incomingStreamDefinitions.iterator();
            while (it.hasNext()) {
                this.tcpEventServer.subscribe(it.next());
            }
            this.tcpEventServer.start();
            log.info(this.logPrefix + "EventReceiverSpout starting to listen for events on port " + this.listeningPort);
            new Thread(new Registrar()).start();
        } catch (Throwable th) {
            log.error(this.logPrefix + "Error starting event listener for spout: " + th.getMessage(), th);
        }
    }

    public void nextTuple() {
        Event poll = this.storedEvents.poll();
        if (poll != null) {
            String streamId = poll.getStreamId();
            if (!this.incomingStreamIDs.contains(streamId)) {
                log.warn(this.logPrefix + "Event received for unknown stream : " + streamId);
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug(this.logPrefix + "Sending event : " + streamId + " => " + poll.toString());
            }
            int i = this.eventCount + 1;
            this.eventCount = i;
            if (i % 10000 == 0) {
                double currentTimeMillis = (System.currentTimeMillis() - this.batchStartTime) / 1000.0d;
                log.info("Processed 10000 events in " + currentTimeMillis + " seconds, throughput : " + (10000.0d / currentTimeMillis) + " events/sec");
                this.eventCount = 0;
                this.batchStartTime = System.currentTimeMillis();
            }
            this.spoutOutputCollector.emit(streamId, Arrays.asList(poll.getData()));
        }
    }

    private int findPort() throws Exception {
        for (int transportMinPort = this.stormDeploymentConfig.getTransportMinPort(); transportMinPort <= this.stormDeploymentConfig.getTransportMaxPort(); transportMinPort++) {
            if (!Utils.isPortUsed(transportMinPort)) {
                return transportMinPort;
            }
        }
        throw new Exception("Cannot find free port in range " + this.stormDeploymentConfig.getTransportMinPort() + "~" + this.stormDeploymentConfig.getTransportMaxPort());
    }

    @Override // org.wso2.carbon.event.processor.common.transport.server.StreamCallback
    public void receive(String str, Object[] objArr) {
        if (log.isDebugEnabled()) {
            log.debug(this.logPrefix + "Received event for stream '" + str + "': " + Arrays.deepToString(objArr));
        }
        this.storedEvents.add(new InEvent(str, System.currentTimeMillis(), objArr));
    }
}
