package org.wso2.carbon.event.processor.common.storm.component;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.carbon.event.processor.common.storm.config.StormDeploymentConfig;
import org.wso2.carbon.event.processor.common.util.AsyncEventPublisher;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.config.SiddhiConfiguration;
import org.wso2.siddhi.query.api.ExecutionPlan;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.api.definition.TableDefinition;
import org.wso2.siddhi.query.api.definition.partition.PartitionDefinition;
import org.wso2.siddhi.query.api.query.Query;

/* loaded from: input_file:org/wso2/carbon/event/processor/common/storm/component/EventPublisherBolt.class */
public class EventPublisherBolt extends BaseBasicBolt {
    private final List<ExecutionPlan> queries;
    private List<StreamDefinition> inputStreamDefinitions;
    private List<StreamDefinition> outputStreamDefinitions;
    private transient AsyncEventPublisher asyncEventPublisher;
    private String executionPlanName;
    private String logPrefix;
    private int tenantId;
    private SiddhiManager siddhiManager;
    private StormDeploymentConfig stormDeploymentConfig;
    private transient Logger log = Logger.getLogger(EventPublisherBolt.class);
    private Map<String, StreamDefinition> streamIdToDefinitionMap = new HashMap();

    public EventPublisherBolt(StormDeploymentConfig stormDeploymentConfig, List<StreamDefinition> list, List<ExecutionPlan> list2, List<StreamDefinition> list3, String str, int i) {
        this.tenantId = -1234;
        this.stormDeploymentConfig = stormDeploymentConfig;
        this.inputStreamDefinitions = list;
        this.queries = list2;
        this.outputStreamDefinitions = list3;
        this.executionPlanName = str;
        this.tenantId = i;
        this.logPrefix = "[" + str + ":" + i + "] - ";
    }

    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        if (this.siddhiManager == null) {
            init();
        }
        if (this.streamIdToDefinitionMap.get(tuple.getSourceStreamId()) != null) {
            this.asyncEventPublisher.sendEvent(tuple.getValues().toArray(), tuple.getSourceStreamId());
        } else {
            this.log.warn(this.logPrefix + "Tuple received for unknown stream " + tuple.getSourceStreamId() + ". Discarding event : " + tuple.toString());
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public void prepare(Map map, TopologyContext topologyContext) {
        super.prepare(map, topologyContext);
        init();
    }

    private void init() {
        try {
            this.log = Logger.getLogger(EventPublisherBolt.class);
            this.siddhiManager = new SiddhiManager(new SiddhiConfiguration());
            if (this.inputStreamDefinitions != null) {
                Iterator<StreamDefinition> it = this.inputStreamDefinitions.iterator();
                while (it.hasNext()) {
                    this.siddhiManager.defineStream(it.next());
                }
            }
            if (this.queries != null) {
                Iterator<ExecutionPlan> it2 = this.queries.iterator();
                while (it2.hasNext()) {
                    Query query = (ExecutionPlan) it2.next();
                    if (query instanceof Query) {
                        this.siddhiManager.addQuery(query);
                    } else if (query instanceof StreamDefinition) {
                        this.siddhiManager.defineStream((StreamDefinition) query);
                    } else if (query instanceof PartitionDefinition) {
                        this.siddhiManager.definePartition((PartitionDefinition) query);
                    } else if (query instanceof TableDefinition) {
                        this.siddhiManager.defineTable((TableDefinition) query);
                    }
                }
            }
            for (StreamDefinition streamDefinition : this.outputStreamDefinitions) {
                this.streamIdToDefinitionMap.put(streamDefinition.getStreamId(), streamDefinition);
            }
            this.asyncEventPublisher = new AsyncEventPublisher(AsyncEventPublisher.DestinationType.CEP_PUBLISHER, new HashSet(this.outputStreamDefinitions), this.stormDeploymentConfig.getManagers().get(0).getHostName(), this.stormDeploymentConfig.getManagers().get(0).getPort(), this.executionPlanName, this.tenantId, this.stormDeploymentConfig);
            this.asyncEventPublisher.initializeConnection(false);
        } catch (Throwable th) {
            this.log.error(this.logPrefix + "Error starting event publisher bolt: " + th.getMessage(), th);
        }
    }
}
