/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.simulator.instance;

import java.time.Duration;
import java.util.Map;
import org.apache.heron.api.Config;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.ExecutorLooper;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.instance.IInstance;
import org.apache.heron.shaded.com.google.protobuf.Message;

public class SpoutInstance
extends org.apache.heron.instance.spout.SpoutInstance
implements IInstance {
    private final boolean ackEnabled;
    private final int maxSpoutPending;
    private final Duration instanceEmitBatchTime;
    private final ByteAmount instanceEmitBatchSize;

    public SpoutInstance(PhysicalPlanHelper helper, Communicator<Message> streamInQueue, Communicator<Message> streamOutQueue, ExecutorLooper looper) {
        super(helper, streamInQueue, streamOutQueue, looper);
        Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
        SystemConfig systemConfig = (SystemConfig)SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.maxSpoutPending = TypeUtils.getInteger(config.get("topology.max.spout.pending"));
        this.instanceEmitBatchTime = systemConfig.getInstanceEmitBatchTime();
        this.instanceEmitBatchSize = systemConfig.getInstanceEmitBatchSize();
        this.ackEnabled = config.containsKey("topology.reliability.mode") ? Config.TopologyReliabilityMode.ATLEAST_ONCE.equals(config.get("topology.reliability.mode").toString()) : Boolean.parseBoolean((String)config.get("topology.acking"));
    }

    @Override
    protected void produceTuple() {
        long totalTuplesEmitted = this.collector.getTotalTuplesEmitted();
        long totalDataEmittedInBytesBeforeCycle = this.collector.getTotalDataEmittedInBytes();
        long startOfCycle = System.nanoTime();
        while (!this.ackEnabled || this.maxSpoutPending > this.collector.numInFlight()) {
            long startTime = System.nanoTime();
            this.spout.nextTuple();
            long latency = System.nanoTime() - startTime;
            this.spoutMetrics.nextTuple(latency);
            long newTotalTuplesEmitted = this.collector.getTotalTuplesEmitted();
            if (newTotalTuplesEmitted == totalTuplesEmitted) break;
            totalTuplesEmitted = newTotalTuplesEmitted;
            if (System.nanoTime() - startOfCycle - this.instanceEmitBatchTime.toNanos() <= 0L && this.collector.getTotalDataEmittedInBytes() - totalDataEmittedInBytesBeforeCycle <= this.instanceEmitBatchSize.asBytes()) continue;
            break;
        }
    }
}

