package org.wso2.carbon.event.processor.core.internal.storm.util;

import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamException;
import org.apache.axiom.om.OMAttribute;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.util.AXIOMUtil;
import org.apache.log4j.Logger;
import org.wso2.carbon.event.processor.common.storm.component.EventPublisherBolt;
import org.wso2.carbon.event.processor.common.storm.component.EventReceiverSpout;
import org.wso2.carbon.event.processor.common.storm.component.SiddhiBolt;
import org.wso2.carbon.event.processor.core.exception.StormQueryConstructionException;
import org.wso2.carbon.event.processor.core.internal.storm.util.ComponentInfoHolder;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConstants;
import org.wso2.carbon.event.processor.manager.core.config.DistributedConfiguration;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.compiler.SiddhiCompiler;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/util/StormTopologyConstructor.class */
public class StormTopologyConstructor {
    private static Logger log = Logger.getLogger(StormTopologyConstructor.class);

    public static TopologyBuilder constructTopologyBuilder(String str, String str2, int i, DistributedConfiguration distributedConfiguration) throws XMLStreamException, StormQueryConstructionException {
        OMElement stringToOM = AXIOMUtil.stringToOM(str);
        TopologyInfoHolder topologyInfoHolder = new TopologyInfoHolder();
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Iterator childrenWithName = stringToOM.getChildrenWithName(new QName(EventProcessorConstants.EVENT_RECEIVER));
        while (childrenWithName.hasNext()) {
            OMElement oMElement = (OMElement) childrenWithName.next();
            String attributeValue = oMElement.getAttributeValue(new QName("name"));
            String attributeValue2 = oMElement.getAttributeValue(new QName(EventProcessorConstants.PARALLEL));
            ComponentInfoHolder componentInfoHolder = new ComponentInfoHolder(attributeValue, ComponentInfoHolder.ComponentType.EVENT_RECEIVER_SPOUT);
            List<String> streamDefinitions = getStreamDefinitions(oMElement.getFirstChildWithName(new QName(EventProcessorConstants.STREAMS)));
            for (String str3 : streamDefinitions) {
                componentInfoHolder.addInputStream(str3);
                componentInfoHolder.addOutputStream(str3);
            }
            componentInfoHolder.setDeclarer(topologyBuilder.setSpout(attributeValue, new EventReceiverSpout(distributedConfiguration, streamDefinitions, str2, i, distributedConfiguration.getHeartbeatInterval()), Integer.valueOf(Integer.parseInt(attributeValue2))));
            topologyInfoHolder.addComponent(componentInfoHolder);
        }
        Iterator childrenWithName2 = stringToOM.getChildrenWithName(new QName(EventProcessorConstants.EVENT_PROCESSOR_TAG));
        while (childrenWithName2.hasNext()) {
            OMElement oMElement2 = (OMElement) childrenWithName2.next();
            String attributeValue3 = oMElement2.getAttributeValue(new QName("name"));
            String attributeValue4 = oMElement2.getAttributeValue(new QName(EventProcessorConstants.PARALLEL));
            String attributeValue5 = oMElement2.getAttributeValue(new QName(EventProcessorConstants.ENFORCE_PARALLELISM));
            ComponentInfoHolder componentInfoHolder2 = new ComponentInfoHolder(attributeValue3, ComponentInfoHolder.ComponentType.SIDDHI_BOLT);
            OMElement firstChildWithName = oMElement2.getFirstChildWithName(new QName(EventProcessorConstants.INPUT_STREAMS));
            List<String> streamDefinitions2 = getStreamDefinitions(firstChildWithName);
            Iterator<String> it = streamDefinitions2.iterator();
            while (it.hasNext()) {
                componentInfoHolder2.addInputStream(it.next());
            }
            addPartitionFields(firstChildWithName, componentInfoHolder2);
            OMElement firstChildWithName2 = oMElement2.getFirstChildWithName(new QName(EventProcessorConstants.QUERIES));
            componentInfoHolder2.addSiddhiQuery(firstChildWithName2.getText());
            List<String> streamDefinitions3 = getStreamDefinitions(oMElement2.getFirstChildWithName(new QName(EventProcessorConstants.OUTPUT_STREAMS)));
            Iterator<String> it2 = streamDefinitions3.iterator();
            while (it2.hasNext()) {
                componentInfoHolder2.addOutputStream(it2.next());
            }
            BoltDeclarer bolt = topologyBuilder.setBolt(attributeValue3, new SiddhiBolt(attributeValue3, streamDefinitions2, firstChildWithName2.getText(), streamDefinitions3, str2, i), Integer.valueOf(Integer.parseInt(attributeValue4)));
            if (attributeValue5.equals("true")) {
                bolt.setMaxTaskParallelism(Integer.valueOf(Integer.parseInt(attributeValue4)));
            }
            componentInfoHolder2.setDeclarer(bolt);
            topologyInfoHolder.addComponent(componentInfoHolder2);
        }
        Iterator childrenWithName3 = stringToOM.getChildrenWithName(new QName(EventProcessorConstants.EVENT_PUBLISHER));
        while (childrenWithName3.hasNext()) {
            OMElement oMElement3 = (OMElement) childrenWithName3.next();
            String attributeValue6 = oMElement3.getAttributeValue(new QName("name"));
            String attributeValue7 = oMElement3.getAttributeValue(new QName(EventProcessorConstants.PARALLEL));
            ComponentInfoHolder componentInfoHolder3 = new ComponentInfoHolder(attributeValue6, ComponentInfoHolder.ComponentType.SIDDHI_BOLT);
            OMElement firstChildWithName3 = oMElement3.getFirstChildWithName(new QName(EventProcessorConstants.INPUT_STREAMS));
            List<String> streamDefinitions4 = getStreamDefinitions(firstChildWithName3);
            Iterator<String> it3 = streamDefinitions4.iterator();
            while (it3.hasNext()) {
                componentInfoHolder3.addInputStream(it3.next());
            }
            addPartitionFields(firstChildWithName3, componentInfoHolder3);
            OMElement firstChildWithName4 = oMElement3.getFirstChildWithName(new QName(EventProcessorConstants.QUERIES));
            String str4 = null;
            if (firstChildWithName4 != null) {
                str4 = firstChildWithName4.getText();
                componentInfoHolder3.addSiddhiQuery(str4);
            }
            List<String> streamDefinitions5 = getStreamDefinitions(oMElement3.getFirstChildWithName(new QName(EventProcessorConstants.OUTPUT_STREAMS)));
            Iterator<String> it4 = streamDefinitions5.iterator();
            while (it4.hasNext()) {
                componentInfoHolder3.addOutputStream(it4.next());
            }
            componentInfoHolder3.setDeclarer(topologyBuilder.setBolt(attributeValue6, new EventPublisherBolt(distributedConfiguration, streamDefinitions4, streamDefinitions5, str4, str2, i), Integer.valueOf(Integer.parseInt(attributeValue7))));
            topologyInfoHolder.addComponent(componentInfoHolder3);
        }
        topologyInfoHolder.indexComponents();
        Iterator<ComponentInfoHolder> it5 = topologyInfoHolder.getComponents().iterator();
        while (it5.hasNext()) {
            ComponentInfoHolder next = it5.next();
            if (next.getComponentType() != ComponentInfoHolder.ComponentType.EVENT_RECEIVER_SPOUT) {
                BoltDeclarer boltDeclarer = (BoltDeclarer) next.getDeclarer();
                for (String str5 : next.getInputStreamIds()) {
                    for (ComponentInfoHolder componentInfoHolder4 : topologyInfoHolder.getPublishingComponents(str5)) {
                        if (!componentInfoHolder4.getComponentName().equals(next.getComponentName())) {
                            String partionenedField = next.getPartionenedField(str5);
                            String str6 = "ShuffleGrouping";
                            if (partionenedField == null) {
                                boltDeclarer.shuffleGrouping(componentInfoHolder4.getComponentName(), str5);
                            } else {
                                str6 = "FieldGrouping";
                                boltDeclarer.fieldsGrouping(componentInfoHolder4.getComponentName(), str5, new Fields(new String[]{partionenedField}));
                            }
                            if (log.isDebugEnabled()) {
                                log.debug("Connecting storm components [Consumer:" + next.getComponentName() + ", Stream:" + str5 + ", Publisher:" + componentInfoHolder4.getComponentName() + ", Grouping:" + str6 + "]");
                            }
                        }
                    }
                }
            }
        }
        return topologyBuilder;
    }

    private static List<String> getStreamDefinitions(OMElement oMElement) {
        ArrayList arrayList = new ArrayList();
        Iterator childrenWithName = oMElement.getChildrenWithName(new QName("stream"));
        while (childrenWithName.hasNext()) {
            arrayList.add(((OMElement) childrenWithName.next()).getText());
        }
        return arrayList;
    }

    private static void addPartitionFields(OMElement oMElement, ComponentInfoHolder componentInfoHolder) throws StormQueryConstructionException {
        Iterator childrenWithName = oMElement.getChildrenWithName(new QName("stream"));
        while (childrenWithName.hasNext()) {
            OMElement oMElement2 = (OMElement) childrenWithName.next();
            OMAttribute attribute = oMElement2.getAttribute(new QName(EventProcessorConstants.PARTITION));
            if (attribute != null) {
                StreamDefinition parseStreamDefinition = SiddhiCompiler.parseStreamDefinition(oMElement2.getText());
                if (!Arrays.asList(parseStreamDefinition.getAttributeNameArray()).contains(attribute.getAttributeValue())) {
                    throw new StormQueryConstructionException("All input streams of the partition should have the partitioning attribute.");
                }
                componentInfoHolder.addStreamPartitioningField(parseStreamDefinition.getId(), attribute.getAttributeValue());
            }
        }
    }
}
