package org.apache.heron.simulator.instance;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.ExecutorLooper;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.tuple.TupleImpl;
import org.apache.heron.instance.IInstance;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.shaded.com.google.protobuf.ByteString;
import org.apache.heron.shaded.com.google.protobuf.Message;

/* loaded from: input_file:org/apache/heron/simulator/instance/BoltInstance.class */
public class BoltInstance extends org.apache.heron.instance.bolt.BoltInstance implements IInstance {
    private final Duration instanceExecuteBatchTime;
    private final ByteAmount instanceExecuteBatchSize;

    public BoltInstance(PhysicalPlanHelper physicalPlanHelper, Communicator<Message> communicator, Communicator<Message> communicator2, ExecutorLooper executorLooper) {
        super(physicalPlanHelper, communicator, communicator2, executorLooper);
        SystemConfig systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.instanceExecuteBatchTime = systemConfig.getInstanceExecuteBatchTime();
        this.instanceExecuteBatchSize = systemConfig.getInstanceExecuteBatchSize();
    }

    private void handleDataTuple(HeronTuples.HeronDataTuple heronDataTuple, TopologyAPI.StreamId streamId, int i) {
        long nanoTime = System.nanoTime();
        ArrayList arrayList = new ArrayList();
        Iterator<ByteString> it = heronDataTuple.getValuesList().iterator();
        while (it.hasNext()) {
            arrayList.add(this.serializer.deserialize(it.next().toByteArray()));
        }
        TupleImpl tupleImpl = new TupleImpl(this.helper.getTopologyContext(), streamId, heronDataTuple.getKey(), heronDataTuple.getRootsList(), arrayList, i);
        long nanoTime2 = System.nanoTime();
        this.bolt.execute(tupleImpl);
        Duration minusNanos = Duration.ofNanos(System.nanoTime()).minusNanos(nanoTime2);
        this.helper.getTopologyContext().invokeHookBoltExecute(tupleImpl, minusNanos);
        this.boltMetrics.deserializeDataTuple(streamId.getId(), streamId.getComponentName(), nanoTime2 - nanoTime);
        this.boltMetrics.executeTuple(streamId.getId(), streamId.getComponentName(), minusNanos.toNanos());
    }

    @Override // org.apache.heron.instance.bolt.BoltInstance, org.apache.heron.instance.IInstance
    public void readTuplesAndExecute(Communicator<Message> communicator) {
        long nanoTime = System.nanoTime();
        long totalDataEmittedInBytes = this.collector.getTotalDataEmittedInBytes();
        while (!communicator.isEmpty()) {
            Message poll = communicator.poll();
            if (poll instanceof CheckpointManager.InitiateStatefulCheckpoint) {
                persistState(((CheckpointManager.InitiateStatefulCheckpoint) poll).getCheckpointId());
            }
            if (poll instanceof HeronTuples.HeronTupleSet) {
                HeronTuples.HeronTupleSet heronTupleSet = (HeronTuples.HeronTupleSet) poll;
                if (heronTupleSet.hasControl()) {
                    throw new RuntimeException("Bolt cannot get acks/fails from other components");
                }
                TopologyAPI.StreamId stream = heronTupleSet.getData().getStream();
                Iterator<HeronTuples.HeronDataTuple> it = heronTupleSet.getData().getTuplesList().iterator();
                while (it.hasNext()) {
                    handleDataTuple(it.next(), stream, heronTupleSet.getSrcTaskId());
                }
                if ((System.nanoTime() - nanoTime) - this.instanceExecuteBatchTime.toNanos() > 0 || this.collector.getTotalDataEmittedInBytes() - totalDataEmittedInBytes > this.instanceExecuteBatchSize.asBytes()) {
                    return;
                }
            }
        }
    }
}
