/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.simulator.executors;

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.SlaveLooper;
import org.apache.heron.common.basics.WakeableLooper;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.shaded.com.google.protobuf.Message;
import org.apache.heron.simulator.executors.InstanceExecutor;
import org.apache.heron.simulator.utils.TopologyManager;
import org.apache.heron.simulator.utils.TupleCache;
import org.apache.heron.simulator.utils.XORManager;

public class StreamExecutor
implements Runnable {
    public static final int NUM_BUCKETS = 3;
    private static final Logger LOG = Logger.getLogger(InstanceExecutor.class.getName());
    private final Map<Integer, InstanceExecutor> taskIdToInstanceExecutor;
    private final TopologyManager topologyManager;
    private final Set<String> spoutSets;
    private final XORManager xorManager;
    private final TupleCache tupleCache;
    private final WakeableLooper looper;

    public StreamExecutor(TopologyManager topologyManager) {
        this.topologyManager = topologyManager;
        this.taskIdToInstanceExecutor = new HashMap<Integer, InstanceExecutor>();
        this.looper = this.createWakeableLooper();
        this.spoutSets = this.createSpoutsSet(topologyManager.getPhysicalPlan());
        this.xorManager = new XORManager(this.looper, this.topologyManager, 3);
        this.tupleCache = new TupleCache();
    }

    public void addInstanceExecutor(InstanceExecutor instanceExecutor) {
        instanceExecutor.getStreamOutQueue().setConsumer(this.looper);
        instanceExecutor.getStreamInQueue().setProducer(this.looper);
        this.taskIdToInstanceExecutor.put(instanceExecutor.getTaskId(), instanceExecutor);
    }

    @Override
    public void run() {
        Thread.currentThread().setName("Simulator_Stream_Executor");
        LOG.info("Stream_Executor starts");
        this.addStreamExecutorTasks();
        this.looper.loop();
    }

    public void stop() {
        this.looper.exitLoop();
    }

    protected void addStreamExecutorTasks() {
        Runnable streamExecutorsTasks = new Runnable(){

            @Override
            public void run() {
                if (StreamExecutor.this.tupleCache.isEmpty()) {
                    StreamExecutor.this.handleInstanceExecutor();
                }
                StreamExecutor.this.drainCache();
            }
        };
        this.looper.addTasksOnWakeup(streamExecutorsTasks);
    }

    public void handleInstanceExecutor() {
        for (InstanceExecutor executor : this.taskIdToInstanceExecutor.values()) {
            boolean isLocalSpout = this.spoutSets.contains(executor.getComponentName());
            int taskId = executor.getTaskId();
            int items = executor.getStreamOutQueue().size();
            for (int i = 0; i < items; ++i) {
                Message msg = executor.getStreamOutQueue().poll();
                if (!(msg instanceof HeronTuples.HeronTupleSet)) continue;
                HeronTuples.HeronTupleSet tupleSet = (HeronTuples.HeronTupleSet)msg;
                if (tupleSet.hasData()) {
                    HeronTuples.HeronDataTupleSet d = tupleSet.getData();
                    TopologyAPI.StreamId streamId = d.getStream();
                    for (HeronTuples.HeronDataTuple tuple : d.getTuplesList()) {
                        List<Integer> outTasks = this.topologyManager.getListToSend(streamId, tuple);
                        outTasks.addAll(tuple.getDestTaskIdsList());
                        if (outTasks.isEmpty()) {
                            LOG.severe("Nobody to send the tuple to");
                        }
                        this.copyDataOutBound(taskId, isLocalSpout, streamId, tuple, outTasks);
                    }
                }
                if (!tupleSet.hasControl()) continue;
                HeronTuples.HeronControlTupleSet c = tupleSet.getControl();
                for (HeronTuples.AckTuple ack : c.getAcksList()) {
                    this.copyControlOutBound(tupleSet.getSrcTaskId(), ack, true);
                }
                for (HeronTuples.AckTuple fail : c.getFailsList()) {
                    this.copyControlOutBound(tupleSet.getSrcTaskId(), fail, false);
                }
            }
        }
    }

    protected boolean isSendTuplesToInstance(List<Integer> taskIds) {
        for (Integer taskId : taskIds) {
            if (this.taskIdToInstanceExecutor.get(taskId).getStreamInQueue().remainingCapacity() > 0) continue;
            return false;
        }
        return true;
    }

    protected void copyDataOutBound(int sourceTaskId, boolean isLocalSpout, TopologyAPI.StreamId streamId, HeronTuples.HeronDataTuple tuple, List<Integer> outTasks) {
        boolean firstIteration = true;
        boolean isAnchored = tuple.getRootsCount() > 0;
        for (Integer outTask : outTasks) {
            long tupleKey = this.tupleCache.addDataTuple(sourceTaskId, outTask, streamId, tuple, isAnchored);
            if (isAnchored) {
                if (isLocalSpout) {
                    if (firstIteration) {
                        this.xorManager.create(sourceTaskId, tuple.getRoots(0).getKey(), tupleKey);
                    } else {
                        this.xorManager.anchor(sourceTaskId, tuple.getRoots(0).getKey(), tupleKey);
                    }
                } else {
                    for (HeronTuples.RootId rootId : tuple.getRootsList()) {
                        HeronTuples.AckTuple t = HeronTuples.AckTuple.newBuilder().addRoots(rootId).setAckedtuple(tupleKey).build();
                        this.tupleCache.addEmitTuple(sourceTaskId, rootId.getTaskid(), t);
                    }
                }
            }
            firstIteration = false;
        }
    }

    protected void copyControlOutBound(int srcTaskId, HeronTuples.AckTuple control, boolean isSuccess) {
        for (HeronTuples.RootId rootId : control.getRootsList()) {
            HeronTuples.AckTuple t = HeronTuples.AckTuple.newBuilder().addRoots(rootId).setAckedtuple(control.getAckedtuple()).build();
            if (isSuccess) {
                this.tupleCache.addAckTuple(srcTaskId, rootId.getTaskid(), t);
                continue;
            }
            this.tupleCache.addFailTuple(srcTaskId, rootId.getTaskid(), t);
        }
    }

    protected void processAcksAndFails(int srcTaskId, int taskId, HeronTuples.HeronControlTupleSet controlTupleSet) {
        HeronTuples.RootId.Builder r;
        HeronTuples.HeronTupleSet.Builder out = HeronTuples.HeronTupleSet.newBuilder();
        out.setSrcTaskId(srcTaskId);
        for (HeronTuples.AckTuple emitTuple : controlTupleSet.getEmitsList()) {
            for (HeronTuples.RootId rootId : emitTuple.getRootsList()) {
                this.xorManager.anchor(taskId, rootId.getKey(), emitTuple.getAckedtuple());
            }
        }
        for (HeronTuples.AckTuple ackTuple : controlTupleSet.getAcksList()) {
            for (HeronTuples.RootId rootId : ackTuple.getRootsList()) {
                if (!this.xorManager.anchor(taskId, rootId.getKey(), ackTuple.getAckedtuple())) continue;
                HeronTuples.AckTuple.Builder a = out.getControlBuilder().addAcksBuilder();
                r = a.addRootsBuilder();
                r.setKey(rootId.getKey());
                r.setTaskid(taskId);
                a.setAckedtuple(0L);
                this.xorManager.remove(taskId, rootId.getKey());
            }
        }
        for (HeronTuples.AckTuple failTuple : controlTupleSet.getFailsList()) {
            for (HeronTuples.RootId rootId : failTuple.getRootsList()) {
                if (!this.xorManager.remove(taskId, rootId.getKey())) continue;
                HeronTuples.AckTuple.Builder f = out.getControlBuilder().addFailsBuilder();
                r = f.addRootsBuilder();
                r.setKey(rootId.getKey());
                r.setTaskid(taskId);
                f.setAckedtuple(0L);
            }
        }
        if (out.hasControl()) {
            this.sendMessageToInstance(taskId, out.build());
        }
    }

    protected void drainCache() {
        Map<Integer, List<HeronTuples.HeronTupleSet>> cache = this.tupleCache.getCache();
        if (!this.isSendTuplesToInstance(new LinkedList<Integer>(cache.keySet()))) {
            return;
        }
        for (Map.Entry<Integer, List<HeronTuples.HeronTupleSet>> entry : cache.entrySet()) {
            int taskId = entry.getKey();
            for (HeronTuples.HeronTupleSet message : entry.getValue()) {
                this.sendInBound(taskId, message);
            }
        }
        this.tupleCache.clear();
    }

    protected void sendInBound(int taskId, HeronTuples.HeronTupleSet message) {
        if (message.hasData()) {
            this.sendMessageToInstance(taskId, message);
        }
        if (message.hasControl()) {
            this.processAcksAndFails(message.getSrcTaskId(), taskId, message.getControl());
        }
    }

    protected void sendMessageToInstance(int taskId, HeronTuples.HeronTupleSet message) {
        this.taskIdToInstanceExecutor.get(taskId).getStreamInQueue().offer(message);
    }

    protected WakeableLooper createWakeableLooper() {
        return new SlaveLooper();
    }

    protected Set<String> createSpoutsSet(PhysicalPlans.PhysicalPlan physicalPlan) {
        HashSet<String> spoutsSet = new HashSet<String>();
        for (TopologyAPI.Spout spout : physicalPlan.getTopology().getSpoutsList()) {
            spoutsSet.add(spout.getComp().getName());
        }
        return spoutsSet;
    }
}

