/*
 * Decompiled with CFR 0.152.
 */
package storm.trident.planner;

import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.jgrapht.DirectedGraph;
import org.jgrapht.graph.DirectedSubgraph;
import org.jgrapht.traverse.TopologicalOrderIterator;
import storm.trident.planner.BridgeReceiver;
import storm.trident.planner.Node;
import storm.trident.planner.ProcessorContext;
import storm.trident.planner.ProcessorNode;
import storm.trident.planner.TridentProcessor;
import storm.trident.planner.TupleReceiver;
import storm.trident.planner.processor.TridentContext;
import storm.trident.state.State;
import storm.trident.topology.BatchInfo;
import storm.trident.topology.ITridentBatchBolt;
import storm.trident.tuple.TridentTuple;
import storm.trident.tuple.TridentTupleView;
import storm.trident.util.TridentUtils;

public class SubtopologyBolt
implements ITridentBatchBolt {
    DirectedGraph _graph;
    Set<Node> _nodes;
    Map<String, InitialReceiver> _roots = new HashMap<String, InitialReceiver>();
    Map<Node, TridentTuple.Factory> _outputFactories = new HashMap<Node, TridentTuple.Factory>();
    Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new HashMap<String, List<TridentProcessor>>();
    Map<Node, String> _batchGroups;

    public SubtopologyBolt(DirectedGraph graph, Set<Node> nodes, Map<Node, String> batchGroups) {
        this._nodes = nodes;
        this._graph = graph;
        this._batchGroups = batchGroups;
    }

    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector batchCollector) {
        int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
        for (Node n : this._nodes) {
            if (n.stateInfo == null) continue;
            State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
            context.setTaskData(n.stateInfo.id, s);
        }
        DirectedSubgraph subgraph = new DirectedSubgraph(this._graph, this._nodes, null);
        TopologicalOrderIterator it = new TopologicalOrderIterator((DirectedGraph)subgraph);
        int stateIndex = 0;
        while (it.hasNext()) {
            Node n = (Node)it.next();
            if (n instanceof ProcessorNode) {
                ProcessorNode pn = (ProcessorNode)n;
                String batchGroup = this._batchGroups.get(n);
                if (!this._myTopologicallyOrdered.containsKey(batchGroup)) {
                    this._myTopologicallyOrdered.put(batchGroup, new ArrayList());
                }
                this._myTopologicallyOrdered.get(batchGroup).add(pn.processor);
                ArrayList<String> parentStreams = new ArrayList<String>();
                ArrayList<TridentTuple.Factory> parentFactories = new ArrayList<TridentTuple.Factory>();
                for (Node p : TridentUtils.getParents(this._graph, n)) {
                    parentStreams.add(p.streamId);
                    if (this._nodes.contains(p)) {
                        parentFactories.add(this._outputFactories.get(p));
                        continue;
                    }
                    if (!this._roots.containsKey(p.streamId)) {
                        this._roots.put(p.streamId, new InitialReceiver(p.streamId, this.getSourceOutputFields(context, p.streamId)));
                    }
                    this._roots.get(p.streamId).addReceiver(pn.processor);
                    parentFactories.add(this._roots.get(p.streamId).getOutputFactory());
                }
                ArrayList<TupleReceiver> targets = new ArrayList<TupleReceiver>();
                boolean outgoingNode = false;
                for (Node cn : TridentUtils.getChildren(this._graph, n)) {
                    if (this._nodes.contains(cn)) {
                        targets.add(((ProcessorNode)cn).processor);
                        continue;
                    }
                    outgoingNode = true;
                }
                if (outgoingNode) {
                    targets.add(new BridgeReceiver(batchCollector));
                }
                TridentContext triContext = new TridentContext(pn.selfOutFields, parentFactories, parentStreams, targets, pn.streamId, stateIndex, batchCollector);
                pn.processor.prepare(conf, context, triContext);
                this._outputFactories.put(n, pn.processor.getOutputFactory());
            }
            ++stateIndex;
        }
    }

    private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
        for (GlobalStreamId g : context.getThisSources().keySet()) {
            if (!g.get_streamId().equals(sourceStream)) continue;
            return context.getComponentOutputFields(g);
        }
        throw new RuntimeException("Could not find fields for source stream " + sourceStream);
    }

    @Override
    public void execute(BatchInfo batchInfo, Tuple tuple) {
        String sourceStream = tuple.getSourceStreamId();
        InitialReceiver ir = this._roots.get(sourceStream);
        if (ir == null) {
            throw new RuntimeException("Received unexpected tuple " + tuple.toString());
        }
        ir.receive((ProcessorContext)batchInfo.state, tuple);
    }

    @Override
    public void finishBatch(BatchInfo batchInfo) {
        for (TridentProcessor p : this._myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext)batchInfo.state);
        }
    }

    @Override
    public Object initBatchState(String batchGroup, Object batchId) {
        ProcessorContext ret = new ProcessorContext(batchId, new Object[this._nodes.size()]);
        for (TridentProcessor p : this._myTopologicallyOrdered.get(batchGroup)) {
            p.startBatch(ret);
        }
        return ret;
    }

    @Override
    public void cleanup() {
        for (String bg : this._myTopologicallyOrdered.keySet()) {
            for (TridentProcessor p : this._myTopologicallyOrdered.get(bg)) {
                p.cleanup();
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        for (Node n : this._nodes) {
            declarer.declareStream(n.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), n.allOutputFields));
        }
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    protected class InitialReceiver {
        List<TridentProcessor> _receivers = new ArrayList<TridentProcessor>();
        TridentTupleView.RootFactory _factory;
        TridentTupleView.ProjectionFactory _project;
        String _stream;

        public InitialReceiver(String stream, Fields allFields) {
            this._stream = stream;
            this._factory = new TridentTupleView.RootFactory(allFields);
            ArrayList<String> projected = new ArrayList<String>(allFields.toList());
            projected.remove(0);
            this._project = new TridentTupleView.ProjectionFactory(this._factory, new Fields(projected));
        }

        public void receive(ProcessorContext context, Tuple tuple) {
            TridentTuple t = this._project.create(this._factory.create(tuple));
            for (TridentProcessor r : this._receivers) {
                r.execute(context, this._stream, t);
            }
        }

        public void addReceiver(TridentProcessor p) {
            this._receivers.add(p);
        }

        public TridentTuple.Factory getOutputFactory() {
            return this._project;
        }
    }
}

