package org.wso2.carbon.event.processor.common.storm.component;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.event.processor.common.util.ThroughputProbe;
import org.wso2.carbon.event.processor.manager.commons.utils.Utils;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.compiler.SiddhiCompiler;

/* loaded from: input_file:org/wso2/carbon/event/processor/common/storm/component/SiddhiBolt.class */
public class SiddhiBolt extends BaseBasicBolt {
    private final String name;
    private transient Log log = LogFactory.getLog(SiddhiBolt.class);
    private transient SiddhiManager siddhiManager;
    private List<String> outputStreamDefinitions;
    private List<String> inputStreamDefinitions;
    private String query;
    private BasicOutputCollector collector;
    private String logPrefix;
    private transient ExecutionPlanRuntime executionPlanRuntime;
    private transient ThroughputProbe inputThroughputProbe;
    private transient ThroughputProbe emitThroughputProbe;

    public SiddhiBolt(String str, List<String> list, String str2, List<String> list2, String str3, int i) {
        this.inputStreamDefinitions = list;
        this.query = str2;
        this.outputStreamDefinitions = list2;
        this.name = str;
        this.logPrefix = "[" + i + ":" + str3 + ":" + str + "] ";
        init();
    }

    private void init() {
        this.log = LogFactory.getLog(SiddhiBolt.class);
        this.inputThroughputProbe = new ThroughputProbe(this.logPrefix + "-IN", 10);
        this.emitThroughputProbe = new ThroughputProbe(this.logPrefix + " -EMIT", 10);
        this.inputThroughputProbe.startSampling();
        this.emitThroughputProbe.startSampling();
        this.siddhiManager = new SiddhiManager();
        this.executionPlanRuntime = this.siddhiManager.createExecutionPlanRuntime(Utils.constructQueryExpression(this.inputStreamDefinitions, this.outputStreamDefinitions, this.query));
        Iterator<String> it = this.outputStreamDefinitions.iterator();
        while (it.hasNext()) {
            final StreamDefinition parseStreamDefinition = SiddhiCompiler.parseStreamDefinition(it.next());
            if (this.log.isDebugEnabled()) {
                this.log.debug(this.logPrefix + " Adding callback for stream: " + parseStreamDefinition.getId());
            }
            this.executionPlanRuntime.addCallback(parseStreamDefinition.getId(), new StreamCallback() { // from class: org.wso2.carbon.event.processor.common.storm.component.SiddhiBolt.1
                public void receive(Event[] eventArr) {
                    for (Event event : eventArr) {
                        Object[] copyOf = Arrays.copyOf(event.getData(), event.getData().length + 1);
                        copyOf[event.getData().length] = Long.valueOf(event.getTimestamp());
                        SiddhiBolt.this.collector.emit(parseStreamDefinition.getId(), Arrays.asList(copyOf));
                        if (SiddhiBolt.this.log.isDebugEnabled()) {
                            SiddhiBolt.this.log.debug(SiddhiBolt.this.logPrefix + "Emitted Event:" + parseStreamDefinition.getId() + ":" + Arrays.deepToString(copyOf) + "@" + event.getTimestamp());
                        }
                        SiddhiBolt.this.emitThroughputProbe.update();
                    }
                }
            });
        }
        this.executionPlanRuntime.start();
    }

    public void prepare(Map map, TopologyContext topologyContext) {
        super.prepare(map, topologyContext);
    }

    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        if (this.siddhiManager == null) {
            init();
        }
        this.inputThroughputProbe.update();
        try {
            this.collector = basicOutputCollector;
            InputHandler inputHandler = this.executionPlanRuntime.getInputHandler(tuple.getSourceStreamId());
            Object[] array = tuple.getValues().toArray();
            long longValue = ((Long) array[array.length - 1]).longValue();
            Object[] remove = ArrayUtils.remove(array, array.length - 1);
            if (this.log.isDebugEnabled()) {
                this.log.debug(this.logPrefix + "Received Event: " + tuple.getSourceStreamId() + ":" + Arrays.deepToString(remove) + "@" + longValue);
            }
            if (inputHandler != null) {
                inputHandler.send(longValue, remove);
            } else {
                this.log.warn(this.logPrefix + "Event received for unknown stream " + tuple.getSourceStreamId() + ". Discarding the Event: " + tuple.getSourceStreamId() + ":" + Arrays.deepToString(remove) + "@" + longValue);
            }
        } catch (InterruptedException e) {
            this.log.error(e);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        if (this.siddhiManager == null) {
            init();
        }
        for (String str : this.outputStreamDefinitions) {
            StreamDefinition parseStreamDefinition = SiddhiCompiler.parseStreamDefinition(str);
            if (str == null) {
                throw new RuntimeException(this.logPrefix + "Cannot find exported stream : " + parseStreamDefinition.getId());
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add(0, "_timestamp");
            Iterator it = parseStreamDefinition.getAttributeList().iterator();
            while (it.hasNext()) {
                arrayList.add(((Attribute) it.next()).getName());
            }
            outputFieldsDeclarer.declareStream(parseStreamDefinition.getId(), new Fields(arrayList));
            this.log.info(this.logPrefix + "Declaring output field for stream :" + parseStreamDefinition.getId());
        }
    }
}
