package org.apache.heron.instance.spout;

import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.Map;
import java.util.logging.Logger;
import org.apache.heron.api.Config;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.metric.GlobalMetrics;
import org.apache.heron.api.spout.ISpout;
import org.apache.heron.api.spout.SpoutOutputCollector;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
import org.apache.heron.api.topology.IUpdatable;
import org.apache.heron.api.utils.Utils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.ExecutorLooper;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.metrics.FullSpoutMetrics;
import org.apache.heron.common.utils.metrics.ISpoutMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
import org.apache.heron.common.utils.topology.TopologyContextImpl;
import org.apache.heron.instance.IInstance;
import org.apache.heron.instance.util.InstanceUtils;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.shaded.com.google.protobuf.Message;

/* loaded from: input_file:org/apache/heron/instance/spout/SpoutInstance.class */
public class SpoutInstance implements IInstance {
    private static final Logger LOG = Logger.getLogger(SpoutInstance.class.getName());
    protected final ISpout spout;
    protected final SpoutOutputCollectorImpl collector;
    protected final ISpoutMetrics spoutMetrics = new FullSpoutMetrics();
    private final Communicator<Message> streamInQueue;
    private final boolean ackEnabled;
    private final boolean enableMessageTimeouts;
    private final boolean isTopologyStateful;
    private final boolean spillState;
    private final String spillStateLocation;
    private boolean waitingForCheckpointSaved;
    private State<Serializable, Serializable> instanceState;
    private final ExecutorLooper looper;
    private final SystemConfig systemConfig;
    private final Map<String, Object> config;
    private PhysicalPlanHelper helper;

    public SpoutInstance(PhysicalPlanHelper physicalPlanHelper, Communicator<Message> communicator, Communicator<Message> communicator2, ExecutorLooper executorLooper) {
        this.helper = physicalPlanHelper;
        this.looper = executorLooper;
        this.streamInQueue = communicator;
        this.spoutMetrics.initMultiCountMetrics(physicalPlanHelper);
        this.config = physicalPlanHelper.getTopologyContext().getTopologyConfig();
        this.systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.enableMessageTimeouts = Boolean.parseBoolean((String) this.config.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS));
        this.isTopologyStateful = String.valueOf(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE).equals(this.config.get(Config.TOPOLOGY_RELIABILITY_MODE));
        this.spillState = Boolean.parseBoolean((String) this.config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE));
        this.spillStateLocation = String.format("%s/%s/", String.valueOf(this.config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)), physicalPlanHelper.getMyInstanceId());
        this.waitingForCheckpointSaved = false;
        LOG.info("Is this topology stateful: " + this.isTopologyStateful);
        if (physicalPlanHelper.getMySpout() == null) {
            throw new RuntimeException("HeronSpoutInstance has no spout in physical plan");
        }
        if (physicalPlanHelper.getMySpout().getComp().hasSerializedObject()) {
            this.spout = (ISpout) Utils.deserialize(physicalPlanHelper.getMySpout().getComp().getSerializedObject().toByteArray());
        } else {
            if (!physicalPlanHelper.getMySpout().getComp().hasClassName()) {
                throw new RuntimeException("Neither java_object nor java_class_name set for spout");
            }
            try {
                this.spout = (ISpout) Class.forName(physicalPlanHelper.getMySpout().getComp().getClassName()).newInstance();
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(e + " Spout class must be in class path.");
            } catch (IllegalAccessException e2) {
                throw new RuntimeException(e2 + " Spout class must have a no-arg constructor.");
            } catch (InstantiationException e3) {
                throw new RuntimeException(e3 + " Spout class must be concrete.");
            }
        }
        this.collector = new SpoutOutputCollectorImpl(SerializeDeSerializeHelper.getSerializer(this.config), physicalPlanHelper, communicator2, this.spoutMetrics);
        this.ackEnabled = this.collector.isAckEnabled();
        LOG.info("Enable Ack: " + this.ackEnabled);
        LOG.info("EnableMessageTimeouts: " + this.enableMessageTimeouts);
    }

    @Override // org.apache.heron.instance.IInstance
    public void update(PhysicalPlanHelper physicalPlanHelper) {
        if (this.spout instanceof IUpdatable) {
            ((IUpdatable) this.spout).update(physicalPlanHelper.getTopologyContext());
        }
        this.collector.updatePhysicalPlanHelper(physicalPlanHelper);
        physicalPlanHelper.prepareForCustomStreamGrouping();
        this.helper = physicalPlanHelper;
    }

    @Override // org.apache.heron.instance.IInstance
    public void persistState(String str) {
        LOG.info("Persisting state for checkpoint: " + str);
        if (!this.isTopologyStateful) {
            throw new RuntimeException("Could not save a non-stateful topology's state");
        }
        this.collector.lock.lock();
        try {
            if (this.spout instanceof IStatefulComponent) {
                ((IStatefulComponent) this.spout).preSave(str);
            }
            if (this.spout instanceof ITwoPhaseStatefulComponent) {
                this.waitingForCheckpointSaved = true;
            }
            this.collector.sendOutState(this.instanceState, str, this.spillState, this.spillStateLocation);
            LOG.info("State persisted for checkpoint: " + str);
        } finally {
            this.collector.lock.unlock();
        }
    }

    @Override // org.apache.heron.instance.IInstance
    public void init(State<Serializable, Serializable> state) {
        TopologyContextImpl topologyContext = this.helper.getTopologyContext();
        GlobalMetrics.init(topologyContext, this.systemConfig.getHeronMetricsExportInterval());
        this.spoutMetrics.registerMetrics(topologyContext);
        if (this.isTopologyStateful && (this.spout instanceof IStatefulComponent)) {
            this.instanceState = state;
            ((IStatefulComponent) this.spout).initState(this.instanceState);
            if (this.spillState) {
                if (FileUtils.isDirectoryExists(this.spillStateLocation)) {
                    FileUtils.cleanDir(this.spillStateLocation);
                } else {
                    FileUtils.createDirectory(this.spillStateLocation);
                }
            }
        }
        this.spout.open(topologyContext.getTopologyConfig(), topologyContext, new SpoutOutputCollector(this.collector));
        topologyContext.invokeHookPrepare();
        this.helper.prepareForCustomStreamGrouping();
    }

    @Override // org.apache.heron.instance.IInstance
    public void start() {
        addSpoutsTasks();
    }

    @Override // org.apache.heron.instance.IInstance
    public void preRestore(String str) {
        if (this.spout instanceof ITwoPhaseStatefulComponent) {
            ((ITwoPhaseStatefulComponent) this.spout).preRestore(str);
        }
    }

    @Override // org.apache.heron.instance.IInstance
    public void onCheckpointSaved(String str) {
        if (this.spout instanceof ITwoPhaseStatefulComponent) {
            ((ITwoPhaseStatefulComponent) this.spout).postSave(str);
            this.waitingForCheckpointSaved = false;
        }
    }

    @Override // org.apache.heron.instance.IInstance
    public void clean() {
        this.helper.getTopologyContext().invokeHookCleanup();
        this.spout.close();
        this.streamInQueue.clear();
        this.collector.clear();
    }

    @Override // org.apache.heron.instance.IInstance
    public void shutdown() {
        clean();
        this.looper.exitLoop();
    }

    @Override // org.apache.heron.instance.IInstance
    public void activate() {
        LOG.info("Spout is activated");
        this.spout.activate();
    }

    @Override // org.apache.heron.instance.IInstance
    public void deactivate() {
        LOG.info("Spout is deactivated");
        this.spout.deactivate();
    }

    private void addSpoutsTasks() {
        this.looper.addTasksOnWakeup(new Runnable() { // from class: org.apache.heron.instance.spout.SpoutInstance.1
            @Override // java.lang.Runnable
            public void run() {
                SpoutInstance.this.spoutMetrics.updateTaskRunCount();
                SpoutInstance.this.readTuplesAndExecute(SpoutInstance.this.streamInQueue);
                if (SpoutInstance.this.isProduceTuple()) {
                    SpoutInstance.this.spoutMetrics.updateProduceTupleCount();
                    SpoutInstance.this.produceTuple();
                    SpoutInstance.this.collector.sendOutTuples();
                }
                if (!SpoutInstance.this.collector.isOutQueuesAvailable()) {
                    SpoutInstance.this.spoutMetrics.updateOutQueueFullCount();
                }
                if (SpoutInstance.this.ackEnabled) {
                    SpoutInstance.this.spoutMetrics.updatePendingTuplesCount(SpoutInstance.this.collector.numInFlight());
                } else {
                    SpoutInstance.this.doImmediateAcks();
                }
                if (SpoutInstance.this.isContinueWork()) {
                    SpoutInstance.this.spoutMetrics.updateContinueWorkCount();
                    SpoutInstance.this.looper.wakeUp();
                }
            }
        });
        if (this.enableMessageTimeouts) {
            lookForTimeouts();
        }
        InstanceUtils.prepareTimerEvents(this.looper, this.helper);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isContinueWork() {
        return this.helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING) && ((!this.ackEnabled && this.collector.isOutQueuesAvailable()) || ((this.ackEnabled && this.collector.isOutQueuesAvailable() && ((long) this.collector.numInFlight()) < TypeUtils.getLong(this.config.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)).longValue()) || (this.ackEnabled && !this.streamInQueue.isEmpty())));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isProduceTuple() {
        return this.collector.isOutQueuesAvailable() && this.helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING) && !this.waitingForCheckpointSaved;
    }

    protected void produceTuple() {
        long totalBytesEmitted;
        int intValue = TypeUtils.getInteger(this.config.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)).intValue();
        long totalTuplesEmitted = this.collector.getTotalTuplesEmitted();
        long totalBytesEmitted2 = this.collector.getTotalBytesEmitted();
        Duration instanceEmitBatchTime = this.systemConfig.getInstanceEmitBatchTime();
        ByteAmount instanceEmitBatchSize = this.systemConfig.getInstanceEmitBatchSize();
        long nanoTime = System.nanoTime();
        long j = nanoTime;
        do {
            if (this.ackEnabled && intValue <= this.collector.numInFlight()) {
                return;
            }
            this.spout.nextTuple();
            long j2 = j;
            j = System.nanoTime();
            this.spoutMetrics.nextTuple(j - j2);
            long totalTuplesEmitted2 = this.collector.getTotalTuplesEmitted();
            totalBytesEmitted = this.collector.getTotalBytesEmitted();
            if (totalTuplesEmitted2 == totalTuplesEmitted) {
                return;
            }
            totalTuplesEmitted = totalTuplesEmitted2;
            if ((j - nanoTime) - instanceEmitBatchTime.toNanos() > 0) {
                return;
            }
        } while (ByteAmount.fromBytes(totalBytesEmitted - totalBytesEmitted2).lessThan(instanceEmitBatchSize));
    }

    private void handleAckTuple(HeronTuples.AckTuple ackTuple, boolean z) {
        for (HeronTuples.RootId rootId : ackTuple.getRootsList()) {
            if (rootId.getTaskid() != this.helper.getMyTaskId()) {
                throw new RuntimeException(String.format("Receiving tuple for task %d in task %d", Integer.valueOf(rootId.getTaskid()), Integer.valueOf(this.helper.getMyTaskId())));
            }
            RootTupleInfo retireInFlight = this.collector.retireInFlight(rootId.getKey());
            if (retireInFlight == null) {
                return;
            }
            Object messageId = retireInFlight.getMessageId();
            if (messageId != null) {
                Duration minusNanos = Duration.ofNanos(System.nanoTime()).minusNanos(retireInFlight.getInsertionTime());
                if (z) {
                    invokeAck(messageId, retireInFlight.getStreamId(), minusNanos);
                } else {
                    invokeFail(messageId, retireInFlight.getStreamId(), minusNanos);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void lookForTimeouts() {
        Duration duration = TypeUtils.getDuration(this.config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), ChronoUnit.SECONDS);
        int instanceAcknowledgementNbuckets = this.systemConfig.getInstanceAcknowledgementNbuckets();
        for (RootTupleInfo rootTupleInfo : this.collector.retireExpired(duration)) {
            this.spoutMetrics.timeoutTuple(rootTupleInfo.getStreamId());
            invokeFail(rootTupleInfo.getMessageId(), rootTupleInfo.getStreamId(), duration);
        }
        this.looper.registerTimerEvent(duration.dividedBy(instanceAcknowledgementNbuckets), new Runnable() { // from class: org.apache.heron.instance.spout.SpoutInstance.2
            @Override // java.lang.Runnable
            public void run() {
                SpoutInstance.this.lookForTimeouts();
            }
        });
    }

    @Override // org.apache.heron.instance.IInstance
    public void readTuplesAndExecute(Communicator<Message> communicator) {
        long nanoTime = System.nanoTime();
        Duration instanceAckBatchTime = this.systemConfig.getInstanceAckBatchTime();
        while (!communicator.isEmpty() && !this.waitingForCheckpointSaved) {
            Message poll = communicator.poll();
            if (poll instanceof CheckpointManager.InitiateStatefulCheckpoint) {
                persistState(((CheckpointManager.InitiateStatefulCheckpoint) poll).getCheckpointId());
            } else {
                if (!(poll instanceof HeronTuples.HeronTupleSet)) {
                    throw new RuntimeException("Invalid data sent to spout instance");
                }
                HeronTuples.HeronTupleSet heronTupleSet = (HeronTuples.HeronTupleSet) poll;
                if (heronTupleSet.hasData()) {
                    throw new RuntimeException("Spout cannot get incoming data tuples from other components");
                }
                if (heronTupleSet.hasControl()) {
                    Iterator<HeronTuples.AckTuple> it = heronTupleSet.getControl().getAcksList().iterator();
                    while (it.hasNext()) {
                        handleAckTuple(it.next(), true);
                    }
                    Iterator<HeronTuples.AckTuple> it2 = heronTupleSet.getControl().getFailsList().iterator();
                    while (it2.hasNext()) {
                        handleAckTuple(it2.next(), false);
                    }
                }
                if ((System.nanoTime() - nanoTime) - instanceAckBatchTime.toNanos() > 0) {
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doImmediateAcks() {
        int size = this.collector.getImmediateAcks().size();
        for (int i = 0; i < size; i++) {
            RootTupleInfo poll = this.collector.getImmediateAcks().poll();
            invokeAck(poll.getMessageId(), poll.getStreamId(), Duration.ZERO);
        }
    }

    private void invokeAck(Object obj, String str, Duration duration) {
        this.spout.ack(obj);
        this.helper.getTopologyContext().invokeHookSpoutAck(obj, duration);
        this.spoutMetrics.ackedTuple(str, duration.toNanos());
    }

    private void invokeFail(Object obj, String str, Duration duration) {
        this.spout.fail(obj);
        this.helper.getTopologyContext().invokeHookSpoutFail(obj, duration);
        this.spoutMetrics.failedTuple(str, duration.toNanos());
    }
}
