/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.simulator.instance;

import java.time.Duration;
import java.util.ArrayList;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SlaveLooper;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.tuple.TupleImpl;
import org.apache.heron.instance.IInstance;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.shaded.com.google.protobuf.ByteString;
import org.apache.heron.shaded.com.google.protobuf.Message;

public class BoltInstance
extends org.apache.heron.instance.bolt.BoltInstance
implements IInstance {
    private final Duration instanceExecuteBatchTime;
    private final ByteAmount instanceExecuteBatchSize;

    public BoltInstance(PhysicalPlanHelper helper, Communicator<Message> streamInQueue, Communicator<Message> streamOutQueue, SlaveLooper looper) {
        super(helper, streamInQueue, streamOutQueue, looper);
        SystemConfig systemConfig = (SystemConfig)SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.instanceExecuteBatchTime = systemConfig.getInstanceExecuteBatchTime();
        this.instanceExecuteBatchSize = systemConfig.getInstanceExecuteBatchSize();
    }

    private void handleDataTuple(HeronTuples.HeronDataTuple dataTuple, TopologyAPI.StreamId stream, int srcTaskId) {
        long startTime = System.nanoTime();
        ArrayList<Object> values = new ArrayList<Object>();
        for (ByteString b : dataTuple.getValuesList()) {
            values.add(this.serializer.deserialize(b.toByteArray()));
        }
        TupleImpl t = new TupleImpl(this.helper.getTopologyContext(), stream, dataTuple.getKey(), dataTuple.getRootsList(), values, srcTaskId);
        long deserializedTime = System.nanoTime();
        this.bolt.execute(t);
        Duration executeLatency = Duration.ofNanos(System.nanoTime()).minusNanos(deserializedTime);
        this.helper.getTopologyContext().invokeHookBoltExecute(t, executeLatency);
        this.boltMetrics.deserializeDataTuple(stream.getId(), stream.getComponentName(), deserializedTime - startTime);
        this.boltMetrics.executeTuple(stream.getId(), stream.getComponentName(), executeLatency.toNanos());
    }

    @Override
    public void readTuplesAndExecute(Communicator<Message> inQueue) {
        long startOfCycle = System.nanoTime();
        long totalDataEmittedInBytesBeforeCycle = this.collector.getTotalDataEmittedInBytes();
        while (!inQueue.isEmpty()) {
            Message msg = inQueue.poll();
            if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) {
                this.persistState(((CheckpointManager.InitiateStatefulCheckpoint)msg).getCheckpointId());
            }
            if (!(msg instanceof HeronTuples.HeronTupleSet)) continue;
            HeronTuples.HeronTupleSet tuples = (HeronTuples.HeronTupleSet)msg;
            if (tuples.hasControl()) {
                throw new RuntimeException("Bolt cannot get acks/fails from other components");
            }
            TopologyAPI.StreamId stream = tuples.getData().getStream();
            for (HeronTuples.HeronDataTuple dataTuple : tuples.getData().getTuplesList()) {
                this.handleDataTuple(dataTuple, stream, tuples.getSrcTaskId());
            }
            if (System.nanoTime() - startOfCycle - this.instanceExecuteBatchTime.toNanos() <= 0L && this.collector.getTotalDataEmittedInBytes() - totalDataEmittedInBytesBeforeCycle <= this.instanceExecuteBatchSize.asBytes()) continue;
            break;
        }
    }
}

