package org.apache.heron.simulator.executors;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.SlaveLooper;
import org.apache.heron.common.basics.WakeableLooper;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.shaded.com.google.protobuf.Message;
import org.apache.heron.simulator.utils.TopologyManager;
import org.apache.heron.simulator.utils.TupleCache;
import org.apache.heron.simulator.utils.XORManager;

/* loaded from: input_file:org/apache/heron/simulator/executors/StreamExecutor.class */
public class StreamExecutor implements Runnable {
    public static final int NUM_BUCKETS = 3;
    private static final Logger LOG = Logger.getLogger(InstanceExecutor.class.getName());
    private final TopologyManager topologyManager;
    private final Set<String> spoutSets;
    private final XORManager xorManager;
    private final Map<Integer, InstanceExecutor> taskIdToInstanceExecutor = new HashMap();
    private final WakeableLooper looper = createWakeableLooper();
    private final TupleCache tupleCache = new TupleCache();

    public StreamExecutor(TopologyManager topologyManager) {
        this.topologyManager = topologyManager;
        this.spoutSets = createSpoutsSet(topologyManager.getPhysicalPlan());
        this.xorManager = new XORManager(this.looper, this.topologyManager, 3);
    }

    public void addInstanceExecutor(InstanceExecutor instanceExecutor) {
        instanceExecutor.getStreamOutQueue().setConsumer(this.looper);
        instanceExecutor.getStreamInQueue().setProducer(this.looper);
        this.taskIdToInstanceExecutor.put(Integer.valueOf(instanceExecutor.getTaskId()), instanceExecutor);
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setName("Simulator_Stream_Executor");
        LOG.info("Stream_Executor starts");
        addStreamExecutorTasks();
        this.looper.loop();
    }

    public void stop() {
        this.looper.exitLoop();
    }

    protected void addStreamExecutorTasks() {
        this.looper.addTasksOnWakeup(new Runnable() { // from class: org.apache.heron.simulator.executors.StreamExecutor.1
            @Override // java.lang.Runnable
            public void run() {
                if (StreamExecutor.this.tupleCache.isEmpty()) {
                    StreamExecutor.this.handleInstanceExecutor();
                }
                StreamExecutor.this.drainCache();
            }
        });
    }

    public void handleInstanceExecutor() {
        for (InstanceExecutor instanceExecutor : this.taskIdToInstanceExecutor.values()) {
            boolean contains = this.spoutSets.contains(instanceExecutor.getComponentName());
            int taskId = instanceExecutor.getTaskId();
            int size = instanceExecutor.getStreamOutQueue().size();
            for (int i = 0; i < size; i++) {
                Message poll = instanceExecutor.getStreamOutQueue().poll();
                if (poll instanceof HeronTuples.HeronTupleSet) {
                    HeronTuples.HeronTupleSet heronTupleSet = (HeronTuples.HeronTupleSet) poll;
                    if (heronTupleSet.hasData()) {
                        HeronTuples.HeronDataTupleSet data = heronTupleSet.getData();
                        TopologyAPI.StreamId stream = data.getStream();
                        for (HeronTuples.HeronDataTuple heronDataTuple : data.getTuplesList()) {
                            List<Integer> listToSend = this.topologyManager.getListToSend(stream, heronDataTuple);
                            listToSend.addAll(heronDataTuple.getDestTaskIdsList());
                            if (listToSend.isEmpty()) {
                                LOG.severe("Nobody to send the tuple to");
                            }
                            copyDataOutBound(taskId, contains, stream, heronDataTuple, listToSend);
                        }
                    }
                    if (heronTupleSet.hasControl()) {
                        HeronTuples.HeronControlTupleSet control = heronTupleSet.getControl();
                        Iterator<HeronTuples.AckTuple> it = control.getAcksList().iterator();
                        while (it.hasNext()) {
                            copyControlOutBound(heronTupleSet.getSrcTaskId(), it.next(), true);
                        }
                        Iterator<HeronTuples.AckTuple> it2 = control.getFailsList().iterator();
                        while (it2.hasNext()) {
                            copyControlOutBound(heronTupleSet.getSrcTaskId(), it2.next(), false);
                        }
                    }
                }
            }
        }
    }

    protected boolean isSendTuplesToInstance(List<Integer> list) {
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            if (this.taskIdToInstanceExecutor.get(it.next()).getStreamInQueue().remainingCapacity() <= 0) {
                return false;
            }
        }
        return true;
    }

    protected void copyDataOutBound(int i, boolean z, TopologyAPI.StreamId streamId, HeronTuples.HeronDataTuple heronDataTuple, List<Integer> list) {
        boolean z2 = true;
        boolean z3 = heronDataTuple.getRootsCount() > 0;
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            long addDataTuple = this.tupleCache.addDataTuple(i, it.next().intValue(), streamId, heronDataTuple, z3);
            if (z3) {
                if (!z) {
                    for (HeronTuples.RootId rootId : heronDataTuple.getRootsList()) {
                        this.tupleCache.addEmitTuple(i, rootId.getTaskid(), HeronTuples.AckTuple.newBuilder().addRoots(rootId).setAckedtuple(addDataTuple).build());
                    }
                } else if (z2) {
                    this.xorManager.create(i, heronDataTuple.getRoots(0).getKey(), addDataTuple);
                } else {
                    this.xorManager.anchor(i, heronDataTuple.getRoots(0).getKey(), addDataTuple);
                }
            }
            z2 = false;
        }
    }

    protected void copyControlOutBound(int i, HeronTuples.AckTuple ackTuple, boolean z) {
        for (HeronTuples.RootId rootId : ackTuple.getRootsList()) {
            HeronTuples.AckTuple build = HeronTuples.AckTuple.newBuilder().addRoots(rootId).setAckedtuple(ackTuple.getAckedtuple()).build();
            if (z) {
                this.tupleCache.addAckTuple(i, rootId.getTaskid(), build);
            } else {
                this.tupleCache.addFailTuple(i, rootId.getTaskid(), build);
            }
        }
    }

    protected void processAcksAndFails(int i, int i2, HeronTuples.HeronControlTupleSet heronControlTupleSet) {
        HeronTuples.HeronTupleSet.Builder newBuilder = HeronTuples.HeronTupleSet.newBuilder();
        newBuilder.setSrcTaskId(i);
        for (HeronTuples.AckTuple ackTuple : heronControlTupleSet.getEmitsList()) {
            Iterator<HeronTuples.RootId> it = ackTuple.getRootsList().iterator();
            while (it.hasNext()) {
                this.xorManager.anchor(i2, it.next().getKey(), ackTuple.getAckedtuple());
            }
        }
        for (HeronTuples.AckTuple ackTuple2 : heronControlTupleSet.getAcksList()) {
            for (HeronTuples.RootId rootId : ackTuple2.getRootsList()) {
                if (this.xorManager.anchor(i2, rootId.getKey(), ackTuple2.getAckedtuple())) {
                    HeronTuples.AckTuple.Builder addAcksBuilder = newBuilder.getControlBuilder().addAcksBuilder();
                    HeronTuples.RootId.Builder addRootsBuilder = addAcksBuilder.addRootsBuilder();
                    addRootsBuilder.setKey(rootId.getKey());
                    addRootsBuilder.setTaskid(i2);
                    addAcksBuilder.setAckedtuple(0L);
                    this.xorManager.remove(i2, rootId.getKey());
                }
            }
        }
        Iterator<HeronTuples.AckTuple> it2 = heronControlTupleSet.getFailsList().iterator();
        while (it2.hasNext()) {
            for (HeronTuples.RootId rootId2 : it2.next().getRootsList()) {
                if (this.xorManager.remove(i2, rootId2.getKey())) {
                    HeronTuples.AckTuple.Builder addFailsBuilder = newBuilder.getControlBuilder().addFailsBuilder();
                    HeronTuples.RootId.Builder addRootsBuilder2 = addFailsBuilder.addRootsBuilder();
                    addRootsBuilder2.setKey(rootId2.getKey());
                    addRootsBuilder2.setTaskid(i2);
                    addFailsBuilder.setAckedtuple(0L);
                }
            }
        }
        if (newBuilder.hasControl()) {
            sendMessageToInstance(i2, newBuilder.build());
        }
    }

    protected void drainCache() {
        Map<Integer, List<HeronTuples.HeronTupleSet>> cache = this.tupleCache.getCache();
        if (isSendTuplesToInstance(new LinkedList(cache.keySet()))) {
            for (Map.Entry<Integer, List<HeronTuples.HeronTupleSet>> entry : cache.entrySet()) {
                int intValue = entry.getKey().intValue();
                Iterator<HeronTuples.HeronTupleSet> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    sendInBound(intValue, it.next());
                }
            }
            this.tupleCache.clear();
        }
    }

    protected void sendInBound(int i, HeronTuples.HeronTupleSet heronTupleSet) {
        if (heronTupleSet.hasData()) {
            sendMessageToInstance(i, heronTupleSet);
        }
        if (heronTupleSet.hasControl()) {
            processAcksAndFails(heronTupleSet.getSrcTaskId(), i, heronTupleSet.getControl());
        }
    }

    protected void sendMessageToInstance(int i, HeronTuples.HeronTupleSet heronTupleSet) {
        this.taskIdToInstanceExecutor.get(Integer.valueOf(i)).getStreamInQueue().offer(heronTupleSet);
    }

    protected WakeableLooper createWakeableLooper() {
        return new SlaveLooper();
    }

    protected Set<String> createSpoutsSet(PhysicalPlans.PhysicalPlan physicalPlan) {
        HashSet hashSet = new HashSet();
        Iterator<TopologyAPI.Spout> it = physicalPlan.getTopology().getSpoutsList().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getComp().getName());
        }
        return hashSet;
    }
}
