package org.wso2.carbon.event.processor.common.storm.component;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.wso2.carbon.event.processor.common.storm.config.StormDeploymentConfig;
import org.wso2.carbon.event.processor.common.storm.manager.service.StormManagerService;
import org.wso2.carbon.event.processor.common.transport.client.TCPEventPublisher;
import org.wso2.carbon.event.processor.common.util.Utils;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.config.SiddhiConfiguration;
import org.wso2.siddhi.query.api.ExecutionPlan;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.api.definition.TableDefinition;
import org.wso2.siddhi.query.api.definition.partition.PartitionDefinition;
import org.wso2.siddhi.query.api.query.Query;

/* loaded from: input_file:org/wso2/carbon/event/processor/common/storm/component/EventPublisherBolt.class */
public class EventPublisherBolt extends BaseBasicBolt {
    private final List<ExecutionPlan> queries;
    private List<StreamDefinition> inputStreamDefinitions;
    private List<StreamDefinition> outputStreamDefinitions;
    private String executionPlanName;
    private String logPrefix;
    private int tenantId;
    private int eventCount;
    private long batchStartTime;
    private SiddhiManager siddhiManager;
    private StormDeploymentConfig stormDeploymentConfig;
    private String thisHostIp;
    private transient Logger log = Logger.getLogger(EventPublisherBolt.class);
    private Map<String, StreamDefinition> streamIdToDefinitionMap = new HashMap();
    private transient TCPEventPublisher tcpEventPublisher = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/common/storm/component/EventPublisherBolt$Registrar.class */
    public class Registrar implements Runnable {
        Registrar() {
        }

        @Override // java.lang.Runnable
        public void run() {
            EventPublisherBolt.this.log.info(EventPublisherBolt.this.logPrefix + "Getting CEP publisher for " + EventPublisherBolt.this.thisHostIp);
            while (!configureStormReceiverFromStormMangerService()) {
                EventPublisherBolt.this.log.info(EventPublisherBolt.this.logPrefix + "Retry getting CEP publisher in 30 sec");
                try {
                    Thread.sleep(30000L);
                } catch (InterruptedException e) {
                }
            }
        }

        private boolean configureStormReceiverFromStormMangerService() {
            TTransport tTransport = null;
            try {
                try {
                    tTransport = new TSocket(EventPublisherBolt.this.stormDeploymentConfig.getManagers().get(0).getHostName(), EventPublisherBolt.this.stormDeploymentConfig.getManagers().get(0).getPort());
                    TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
                    tTransport.open();
                    String cEPPublisher = new StormManagerService.Client(tBinaryProtocol).getCEPPublisher(EventPublisherBolt.this.tenantId, EventPublisherBolt.this.executionPlanName, EventPublisherBolt.this.thisHostIp);
                    EventPublisherBolt.this.log.info(EventPublisherBolt.this.logPrefix + "Successfully got CEP publisher with " + cEPPublisher);
                    EventPublisherBolt.this.tcpEventPublisher = new TCPEventPublisher(cEPPublisher);
                    for (StreamDefinition streamDefinition : EventPublisherBolt.this.outputStreamDefinitions) {
                        if (EventPublisherBolt.this.log.isDebugEnabled()) {
                            EventPublisherBolt.this.log.debug(EventPublisherBolt.this.logPrefix + "EventPublisherBolt adding stream definition to client for exported Siddhi stream: " + streamDefinition.getStreamId());
                        }
                        EventPublisherBolt.this.tcpEventPublisher.addStreamDefinition(streamDefinition);
                    }
                    EventPublisherBolt.this.log.info(EventPublisherBolt.this.logPrefix + "connected to CEP publisher at " + cEPPublisher);
                    if (tTransport != null) {
                        tTransport.close();
                    }
                    return true;
                } catch (Exception e) {
                    EventPublisherBolt.this.log.error(EventPublisherBolt.this.logPrefix + "Error in getting CEP publisher", e);
                    if (tTransport != null) {
                        tTransport.close();
                    }
                    return false;
                }
            } catch (Throwable th) {
                if (tTransport != null) {
                    tTransport.close();
                }
                throw th;
            }
        }
    }

    public EventPublisherBolt(StormDeploymentConfig stormDeploymentConfig, List<StreamDefinition> list, List<ExecutionPlan> list2, List<StreamDefinition> list3, String str, int i) {
        this.tenantId = -1234;
        this.stormDeploymentConfig = stormDeploymentConfig;
        this.inputStreamDefinitions = list;
        this.queries = list2;
        this.outputStreamDefinitions = list3;
        this.executionPlanName = str;
        this.tenantId = i;
        this.logPrefix = "{" + str + ":" + i + "} - ";
    }

    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        if (this.tcpEventPublisher == null) {
            if (this.siddhiManager != null) {
                this.siddhiManager.shutdown();
            }
            init();
        }
        if (this.tcpEventPublisher == null) {
            this.log.warn("Dropping the event since the data publisher is not yet initialized for " + this.executionPlanName + ":" + this.tenantId);
            return;
        }
        if (this.streamIdToDefinitionMap.get(tuple.getSourceStreamId()) == null) {
            this.log.warn(this.logPrefix + "Tuple received for unknown stream " + tuple.getSourceStreamId() + ". Discarding event : " + tuple.toString());
            return;
        }
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug(this.logPrefix + "Event published to CEP Publisher =>" + tuple.toString());
            }
            int i = this.eventCount + 1;
            this.eventCount = i;
            if (i % 10000 == 0) {
                double currentTimeMillis = (System.currentTimeMillis() - this.batchStartTime) / 1000.0d;
                this.log.info("Processed 10000 events in " + currentTimeMillis + " seconds, throughput : " + (10000.0d / currentTimeMillis) + " events/sec");
                this.eventCount = 0;
                this.batchStartTime = System.currentTimeMillis();
            }
            this.tcpEventPublisher.sendEvent(tuple.getSourceStreamId(), tuple.getValues().toArray());
        } catch (IOException e) {
            this.log.error(this.logPrefix + "Error while publishing event to CEP publisher", e);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public void prepare(Map map, TopologyContext topologyContext) {
        super.prepare(map, topologyContext);
        init();
    }

    private void init() {
        try {
            this.log = Logger.getLogger(EventPublisherBolt.class);
            this.siddhiManager = new SiddhiManager(new SiddhiConfiguration());
            this.eventCount = 0;
            this.batchStartTime = System.currentTimeMillis();
            this.thisHostIp = Utils.findAddress("localhost");
            if (this.inputStreamDefinitions != null) {
                Iterator<StreamDefinition> it = this.inputStreamDefinitions.iterator();
                while (it.hasNext()) {
                    this.siddhiManager.defineStream(it.next());
                }
            }
            if (this.queries != null) {
                Iterator<ExecutionPlan> it2 = this.queries.iterator();
                while (it2.hasNext()) {
                    Query query = (ExecutionPlan) it2.next();
                    if (query instanceof Query) {
                        this.siddhiManager.addQuery(query);
                    } else if (query instanceof StreamDefinition) {
                        this.siddhiManager.defineStream((StreamDefinition) query);
                    } else if (query instanceof PartitionDefinition) {
                        this.siddhiManager.definePartition((PartitionDefinition) query);
                    } else if (query instanceof TableDefinition) {
                        this.siddhiManager.defineTable((TableDefinition) query);
                    }
                }
            }
            for (StreamDefinition streamDefinition : this.outputStreamDefinitions) {
                this.streamIdToDefinitionMap.put(streamDefinition.getStreamId(), streamDefinition);
            }
            new Thread(new Registrar()).start();
        } catch (Throwable th) {
            this.log.error(this.logPrefix + "Error starting event publisher bolt: " + th.getMessage(), th);
        }
    }
}
