package org.apache.heron.instance.bolt;

import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Map;
import java.util.logging.Logger;
import org.apache.heron.api.Config;
import org.apache.heron.api.bolt.IBolt;
import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.metric.GlobalMetrics;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
import org.apache.heron.api.topology.IUpdatable;
import org.apache.heron.api.utils.Utils;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SlaveLooper;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.metrics.FullBoltMetrics;
import org.apache.heron.common.utils.metrics.IBoltMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
import org.apache.heron.common.utils.topology.TopologyContextImpl;
import org.apache.heron.common.utils.tuple.TickTuple;
import org.apache.heron.common.utils.tuple.TupleImpl;
import org.apache.heron.instance.IInstance;
import org.apache.heron.instance.util.InstanceUtils;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.shaded.com.google.protobuf.Message;

/* loaded from: input_file:org/apache/heron/instance/bolt/BoltInstance.class */
public class BoltInstance implements IInstance {
    private static final Logger LOG = Logger.getLogger(BoltInstance.class.getName());
    protected PhysicalPlanHelper helper;
    protected final IBolt bolt;
    protected final BoltOutputCollectorImpl collector;
    protected final IPluggableSerializer serializer;
    protected final IBoltMetrics boltMetrics = new FullBoltMetrics();
    private final Communicator<Message> streamInQueue;
    private final boolean isTopologyStateful;
    private final boolean spillState;
    private final String spillStateLocation;
    private State<Serializable, Serializable> instanceState;
    private final Map<String, Object> config;
    private final SlaveLooper looper;
    private final SystemConfig systemConfig;

    public BoltInstance(PhysicalPlanHelper physicalPlanHelper, Communicator<Message> communicator, Communicator<Message> communicator2, SlaveLooper slaveLooper) {
        this.helper = physicalPlanHelper;
        this.looper = slaveLooper;
        this.streamInQueue = communicator;
        this.boltMetrics.initMultiCountMetrics(physicalPlanHelper);
        this.serializer = SerializeDeSerializeHelper.getSerializer(physicalPlanHelper.getTopologyContext().getTopologyConfig());
        this.systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.config = physicalPlanHelper.getTopologyContext().getTopologyConfig();
        this.isTopologyStateful = String.valueOf(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE).equals(this.config.get(Config.TOPOLOGY_RELIABILITY_MODE));
        LOG.info("Is this topology stateful: " + this.isTopologyStateful);
        this.spillState = Boolean.parseBoolean((String) this.config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE));
        this.spillStateLocation = String.format("%s/%s/", String.valueOf(this.config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)), physicalPlanHelper.getMyInstanceId());
        if (physicalPlanHelper.getMyBolt() == null) {
            throw new RuntimeException("HeronBoltInstance has no bolt in physical plan.");
        }
        if (physicalPlanHelper.getMyBolt().getComp().hasSerializedObject()) {
            this.bolt = (IBolt) Utils.deserialize(physicalPlanHelper.getMyBolt().getComp().getSerializedObject().toByteArray());
        } else {
            if (!physicalPlanHelper.getMyBolt().getComp().hasClassName()) {
                throw new RuntimeException("Neither java_object nor java_class_name set for bolt");
            }
            try {
                this.bolt = (IBolt) Class.forName(physicalPlanHelper.getMyBolt().getComp().getClassName()).newInstance();
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(e + " Bolt class must be in class path.");
            } catch (IllegalAccessException e2) {
                throw new RuntimeException(e2 + " Bolt class must have a no-arg constructor.");
            } catch (InstantiationException e3) {
                throw new RuntimeException(e3 + " Bolt class must be concrete.");
            }
        }
        this.collector = new BoltOutputCollectorImpl(this.serializer, physicalPlanHelper, communicator2, this.boltMetrics);
    }

    @Override // org.apache.heron.instance.IInstance
    public void update(PhysicalPlanHelper physicalPlanHelper) {
        if (this.bolt instanceof IUpdatable) {
            ((IUpdatable) this.bolt).update(physicalPlanHelper.getTopologyContext());
        }
        this.collector.updatePhysicalPlanHelper(physicalPlanHelper);
        physicalPlanHelper.prepareForCustomStreamGrouping();
        physicalPlanHelper.getTopologyContext().getTopologyConfig().putAll(this.helper.getTopologyContext().getTopologyConfig());
        this.helper = physicalPlanHelper;
    }

    @Override // org.apache.heron.instance.IInstance
    public void persistState(String str) {
        LOG.info("Persisting state for checkpoint: " + str);
        if (!this.isTopologyStateful) {
            throw new RuntimeException("Could not save a non-stateful topology's state");
        }
        this.collector.lock.lock();
        try {
            if (this.bolt instanceof IStatefulComponent) {
                ((IStatefulComponent) this.bolt).preSave(str);
            }
            this.collector.sendOutState(this.instanceState, str, this.spillState, this.spillStateLocation);
            LOG.info("State persisted for checkpoint: " + str);
        } finally {
            this.collector.lock.unlock();
        }
    }

    @Override // org.apache.heron.instance.IInstance
    public void init(State<Serializable, Serializable> state) {
        TopologyContextImpl topologyContext = this.helper.getTopologyContext();
        GlobalMetrics.init(topologyContext, this.systemConfig.getHeronMetricsExportInterval());
        this.boltMetrics.registerMetrics(topologyContext);
        if (this.isTopologyStateful && (this.bolt instanceof IStatefulComponent)) {
            this.instanceState = state;
            ((IStatefulComponent) this.bolt).initState(this.instanceState);
            if (this.spillState) {
                if (FileUtils.isDirectoryExists(this.spillStateLocation)) {
                    FileUtils.cleanDir(this.spillStateLocation);
                } else {
                    FileUtils.createDirectory(this.spillStateLocation);
                }
            }
        }
        this.bolt.prepare(topologyContext.getTopologyConfig(), topologyContext, new OutputCollector(this.collector));
        topologyContext.invokeHookPrepare();
        this.helper.prepareForCustomStreamGrouping();
    }

    @Override // org.apache.heron.instance.IInstance
    public void start() {
        addBoltTasks();
    }

    @Override // org.apache.heron.instance.IInstance
    public void clean() {
        this.helper.getTopologyContext().invokeHookCleanup();
        this.bolt.cleanup();
        this.streamInQueue.clear();
        this.collector.clear();
    }

    @Override // org.apache.heron.instance.IInstance
    public void shutdown() {
        clean();
        this.looper.exitLoop();
    }

    private void addBoltTasks() {
        this.looper.addTasksOnWakeup(new Runnable() { // from class: org.apache.heron.instance.bolt.BoltInstance.1
            @Override // java.lang.Runnable
            public void run() {
                BoltInstance.this.boltMetrics.updateTaskRunCount();
                if (BoltInstance.this.collector.isOutQueuesAvailable()) {
                    BoltInstance.this.boltMetrics.updateExecutionCount();
                    BoltInstance.this.readTuplesAndExecute(BoltInstance.this.streamInQueue);
                    BoltInstance.this.collector.sendOutTuples();
                } else {
                    BoltInstance.this.boltMetrics.updateOutQueueFullCount();
                }
                if (!BoltInstance.this.collector.isOutQueuesAvailable() || BoltInstance.this.streamInQueue.isEmpty()) {
                    return;
                }
                BoltInstance.this.boltMetrics.updateContinueWorkCount();
                BoltInstance.this.looper.wakeUp();
            }
        });
        PrepareTickTupleTimer();
        InstanceUtils.prepareTimerEvents(this.looper, this.helper);
    }

    @Override // org.apache.heron.instance.IInstance
    public void readTuplesAndExecute(Communicator<Message> communicator) {
        TopologyContextImpl topologyContext = this.helper.getTopologyContext();
        Duration instanceExecuteBatchTime = this.systemConfig.getInstanceExecuteBatchTime();
        long nanoTime = System.nanoTime();
        while (!communicator.isEmpty()) {
            Message poll = communicator.poll();
            if (poll instanceof CheckpointManager.InitiateStatefulCheckpoint) {
                persistState(((CheckpointManager.InitiateStatefulCheckpoint) poll).getCheckpointId());
            }
            if (poll instanceof HeronTuples.HeronTupleSet) {
                HeronTuples.HeronTupleSet heronTupleSet = (HeronTuples.HeronTupleSet) poll;
                if (heronTupleSet.hasControl()) {
                    throw new RuntimeException("Bolt cannot get acks/fails from other components");
                }
                TopologyAPI.StreamId stream = heronTupleSet.getData().getStream();
                int size = topologyContext.getComponentOutputFields(stream.getComponentName(), stream.getId()).size();
                int srcTaskId = heronTupleSet.getSrcTaskId();
                for (HeronTuples.HeronDataTuple heronDataTuple : heronTupleSet.getData().getTuplesList()) {
                    long nanoTime2 = System.nanoTime();
                    ArrayList arrayList = new ArrayList(size);
                    for (int i = 0; i < size; i++) {
                        arrayList.add(this.serializer.deserialize(heronDataTuple.getValues(i).toByteArray()));
                    }
                    TupleImpl tupleImpl = new TupleImpl(topologyContext, stream, heronDataTuple.getKey(), heronDataTuple.getRootsList(), arrayList, System.nanoTime(), false, srcTaskId);
                    long nanoTime3 = System.nanoTime();
                    this.bolt.execute(tupleImpl);
                    long nanos = Duration.ofNanos(System.nanoTime()).minusNanos(nanoTime3).toNanos();
                    topologyContext.invokeHookBoltExecute(tupleImpl, Duration.ofNanos(nanos));
                    this.boltMetrics.deserializeDataTuple(stream.getId(), stream.getComponentName(), nanoTime3 - nanoTime2);
                    this.boltMetrics.executeTuple(stream.getId(), stream.getComponentName(), nanos);
                }
                if ((System.nanoTime() - nanoTime) - instanceExecuteBatchTime.toNanos() > 0) {
                    return;
                }
            }
        }
    }

    @Override // org.apache.heron.instance.IInstance
    public void activate() {
    }

    @Override // org.apache.heron.instance.IInstance
    public void deactivate() {
    }

    private void PrepareTickTupleTimer() {
        Object obj = this.helper.getTopologyContext().getTopologyConfig().get(Config.TOPOLOGY_TICK_TUPLE_FREQ_MS);
        if (obj != null) {
            this.looper.registerPeriodicEvent(TypeUtils.getDuration(obj, ChronoUnit.MILLIS), () -> {
                SendTickTuple();
            });
        }
    }

    private void SendTickTuple() {
        TickTuple tickTuple = new TickTuple();
        long nanoTime = System.nanoTime();
        this.bolt.execute(tickTuple);
        this.boltMetrics.executeTuple(tickTuple.getSourceStreamId(), tickTuple.getSourceComponent(), System.nanoTime() - nanoTime);
        this.collector.sendOutTuples();
    }
}
