/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.instance.spout;

import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import org.apache.heron.api.Config;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.metric.GlobalMetrics;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.spout.ISpout;
import org.apache.heron.api.spout.SpoutOutputCollector;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
import org.apache.heron.api.topology.IUpdatable;
import org.apache.heron.api.utils.Utils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.ExecutorLooper;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.metrics.FullSpoutMetrics;
import org.apache.heron.common.utils.metrics.ISpoutMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
import org.apache.heron.common.utils.topology.TopologyContextImpl;
import org.apache.heron.instance.IInstance;
import org.apache.heron.instance.spout.RootTupleInfo;
import org.apache.heron.instance.spout.SpoutOutputCollectorImpl;
import org.apache.heron.instance.util.InstanceUtils;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.shaded.com.google.protobuf.Message;

public class SpoutInstance
implements IInstance {
    private static final Logger LOG = Logger.getLogger(SpoutInstance.class.getName());
    protected final ISpout spout;
    protected final SpoutOutputCollectorImpl collector;
    protected final ISpoutMetrics spoutMetrics;
    private final Communicator<Message> streamInQueue;
    private final boolean ackEnabled;
    private final boolean enableMessageTimeouts;
    private final boolean isTopologyStateful;
    private final boolean spillState;
    private final String spillStateLocation;
    private boolean waitingForCheckpointSaved;
    private State<Serializable, Serializable> instanceState;
    private final ExecutorLooper looper;
    private final SystemConfig systemConfig;
    private final Map<String, Object> config;
    private PhysicalPlanHelper helper;

    public SpoutInstance(PhysicalPlanHelper helper, Communicator<Message> streamInQueue, Communicator<Message> streamOutQueue, ExecutorLooper looper) {
        this.helper = helper;
        this.looper = looper;
        this.streamInQueue = streamInQueue;
        this.spoutMetrics = new FullSpoutMetrics();
        this.spoutMetrics.initMultiCountMetrics(helper);
        this.config = helper.getTopologyContext().getTopologyConfig();
        this.systemConfig = (SystemConfig)SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.enableMessageTimeouts = Boolean.parseBoolean((String)this.config.get("topology.enable.message.timeouts"));
        this.isTopologyStateful = String.valueOf((Object)Config.TopologyReliabilityMode.EFFECTIVELY_ONCE).equals(this.config.get("topology.reliability.mode"));
        this.spillState = Boolean.parseBoolean((String)this.config.get("topology.stateful.spill.state"));
        this.spillStateLocation = String.format("%s/%s/", String.valueOf(this.config.get("topology.stateful.spill.state.location")), helper.getMyInstanceId());
        this.waitingForCheckpointSaved = false;
        LOG.info("Is this topology stateful: " + this.isTopologyStateful);
        if (helper.getMySpout() == null) {
            throw new RuntimeException("HeronSpoutInstance has no spout in physical plan");
        }
        if (helper.getMySpout().getComp().hasSerializedObject()) {
            this.spout = (ISpout)Utils.deserialize(helper.getMySpout().getComp().getSerializedObject().toByteArray());
        } else if (helper.getMySpout().getComp().hasClassName()) {
            String spoutClassName = helper.getMySpout().getComp().getClassName();
            try {
                this.spout = (ISpout)Class.forName(spoutClassName).newInstance();
            }
            catch (ClassNotFoundException ex) {
                throw new RuntimeException(ex + " Spout class must be in class path.");
            }
            catch (InstantiationException ex) {
                throw new RuntimeException(ex + " Spout class must be concrete.");
            }
            catch (IllegalAccessException ex) {
                throw new RuntimeException(ex + " Spout class must have a no-arg constructor.");
            }
        } else {
            throw new RuntimeException("Neither java_object nor java_class_name set for spout");
        }
        IPluggableSerializer serializer = SerializeDeSerializeHelper.getSerializer(this.config);
        this.collector = new SpoutOutputCollectorImpl(serializer, helper, streamOutQueue, this.spoutMetrics);
        this.ackEnabled = this.collector.isAckEnabled();
        LOG.info("Enable Ack: " + this.ackEnabled);
        LOG.info("EnableMessageTimeouts: " + this.enableMessageTimeouts);
    }

    @Override
    public void update(PhysicalPlanHelper physicalPlanHelper) {
        if (this.spout instanceof IUpdatable) {
            ((IUpdatable)((Object)this.spout)).update(physicalPlanHelper.getTopologyContext());
        }
        this.collector.updatePhysicalPlanHelper(physicalPlanHelper);
        physicalPlanHelper.prepareForCustomStreamGrouping();
        this.helper = physicalPlanHelper;
    }

    @Override
    public void persistState(String checkpointId) {
        LOG.info("Persisting state for checkpoint: " + checkpointId);
        if (!this.isTopologyStateful) {
            throw new RuntimeException("Could not save a non-stateful topology's state");
        }
        this.collector.lock.lock();
        try {
            if (this.spout instanceof IStatefulComponent) {
                ((IStatefulComponent)((Object)this.spout)).preSave(checkpointId);
            }
            if (this.spout instanceof ITwoPhaseStatefulComponent) {
                this.waitingForCheckpointSaved = true;
            }
            this.collector.sendOutState(this.instanceState, checkpointId, this.spillState, this.spillStateLocation);
        }
        finally {
            this.collector.lock.unlock();
        }
        LOG.info("State persisted for checkpoint: " + checkpointId);
    }

    @Override
    public void init(State<Serializable, Serializable> state) {
        TopologyContextImpl topologyContext = this.helper.getTopologyContext();
        GlobalMetrics.init(topologyContext, this.systemConfig.getHeronMetricsExportInterval());
        this.spoutMetrics.registerMetrics(topologyContext);
        if (this.isTopologyStateful && this.spout instanceof IStatefulComponent) {
            this.instanceState = state;
            ((IStatefulComponent)((Object)this.spout)).initState(this.instanceState);
            if (this.spillState) {
                if (FileUtils.isDirectoryExists(this.spillStateLocation)) {
                    FileUtils.cleanDir(this.spillStateLocation);
                } else {
                    FileUtils.createDirectory(this.spillStateLocation);
                }
            }
        }
        this.spout.open(topologyContext.getTopologyConfig(), topologyContext, new SpoutOutputCollector(this.collector));
        topologyContext.invokeHookPrepare();
        this.helper.prepareForCustomStreamGrouping();
    }

    @Override
    public void start() {
        this.addSpoutsTasks();
    }

    @Override
    public void preRestore(String checkpointId) {
        if (this.spout instanceof ITwoPhaseStatefulComponent) {
            ((ITwoPhaseStatefulComponent)((Object)this.spout)).preRestore(checkpointId);
        }
    }

    @Override
    public void onCheckpointSaved(String checkpointId) {
        if (this.spout instanceof ITwoPhaseStatefulComponent) {
            ((ITwoPhaseStatefulComponent)((Object)this.spout)).postSave(checkpointId);
            this.waitingForCheckpointSaved = false;
        }
    }

    @Override
    public void clean() {
        this.helper.getTopologyContext().invokeHookCleanup();
        this.spout.close();
        this.streamInQueue.clear();
        this.collector.clear();
    }

    @Override
    public void shutdown() {
        this.clean();
        this.looper.exitLoop();
    }

    @Override
    public void activate() {
        LOG.info("Spout is activated");
        this.spout.activate();
    }

    @Override
    public void deactivate() {
        LOG.info("Spout is deactivated");
        this.spout.deactivate();
    }

    private void addSpoutsTasks() {
        Runnable spoutTasks = new Runnable(){

            @Override
            public void run() {
                SpoutInstance.this.spoutMetrics.updateTaskRunCount();
                SpoutInstance.this.readTuplesAndExecute(SpoutInstance.this.streamInQueue);
                if (SpoutInstance.this.isProduceTuple()) {
                    SpoutInstance.this.spoutMetrics.updateProduceTupleCount();
                    SpoutInstance.this.produceTuple();
                    SpoutInstance.this.collector.sendOutTuples();
                }
                if (!SpoutInstance.this.collector.isOutQueuesAvailable()) {
                    SpoutInstance.this.spoutMetrics.updateOutQueueFullCount();
                }
                if (SpoutInstance.this.ackEnabled) {
                    SpoutInstance.this.spoutMetrics.updatePendingTuplesCount(SpoutInstance.this.collector.numInFlight());
                } else {
                    SpoutInstance.this.doImmediateAcks();
                }
                if (SpoutInstance.this.isContinueWork()) {
                    SpoutInstance.this.spoutMetrics.updateContinueWorkCount();
                    SpoutInstance.this.looper.wakeUp();
                }
            }
        };
        this.looper.addTasksOnWakeup(spoutTasks);
        if (this.enableMessageTimeouts) {
            this.lookForTimeouts();
        }
        InstanceUtils.prepareTimerEvents(this.looper, this.helper);
    }

    private boolean isContinueWork() {
        long maxSpoutPending = TypeUtils.getLong(this.config.get("topology.max.spout.pending"));
        return this.helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING) && (!this.ackEnabled && this.collector.isOutQueuesAvailable() || this.ackEnabled && this.collector.isOutQueuesAvailable() && (long)this.collector.numInFlight() < maxSpoutPending || this.ackEnabled && !this.streamInQueue.isEmpty());
    }

    private boolean isProduceTuple() {
        return this.collector.isOutQueuesAvailable() && this.helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING) && !this.waitingForCheckpointSaved;
    }

    protected void produceTuple() {
        long startOfCycle;
        int maxSpoutPending = TypeUtils.getInteger(this.config.get("topology.max.spout.pending"));
        long totalTuplesEmitted = this.collector.getTotalTuplesEmitted();
        long totalBytesEmitted = this.collector.getTotalBytesEmitted();
        Duration instanceEmitBatchTime = this.systemConfig.getInstanceEmitBatchTime();
        ByteAmount instanceEmitBatchSize = this.systemConfig.getInstanceEmitBatchSize();
        long currentTime = startOfCycle = System.nanoTime();
        while (!this.ackEnabled || maxSpoutPending > this.collector.numInFlight()) {
            this.spout.nextTuple();
            long startTime = currentTime;
            currentTime = System.nanoTime();
            long latency = currentTime - startTime;
            this.spoutMetrics.nextTuple(latency);
            long newTotalTuplesEmitted = this.collector.getTotalTuplesEmitted();
            long newTotalBytesEmitted = this.collector.getTotalBytesEmitted();
            if (newTotalTuplesEmitted == totalTuplesEmitted) break;
            totalTuplesEmitted = newTotalTuplesEmitted;
            if (currentTime - startOfCycle - instanceEmitBatchTime.toNanos() <= 0L && ByteAmount.fromBytes(newTotalBytesEmitted - totalBytesEmitted).lessThan(instanceEmitBatchSize)) continue;
            break;
        }
    }

    private void handleAckTuple(HeronTuples.AckTuple ackTuple, boolean isSuccess) {
        for (HeronTuples.RootId rt : ackTuple.getRootsList()) {
            if (rt.getTaskid() != this.helper.getMyTaskId()) {
                throw new RuntimeException(String.format("Receiving tuple for task %d in task %d", rt.getTaskid(), this.helper.getMyTaskId()));
            }
            long rootId = rt.getKey();
            RootTupleInfo rootTupleInfo = this.collector.retireInFlight(rootId);
            if (rootTupleInfo == null) {
                return;
            }
            Object messageId = rootTupleInfo.getMessageId();
            if (messageId == null) continue;
            Duration latency = Duration.ofNanos(System.nanoTime()).minusNanos(rootTupleInfo.getInsertionTime());
            if (isSuccess) {
                this.invokeAck(messageId, rootTupleInfo.getStreamId(), latency);
                continue;
            }
            this.invokeFail(messageId, rootTupleInfo.getStreamId(), latency);
        }
    }

    private void lookForTimeouts() {
        Duration timeout = TypeUtils.getDuration(this.config.get("topology.message.timeout.secs"), ChronoUnit.SECONDS);
        int nBucket = this.systemConfig.getInstanceAcknowledgementNbuckets();
        List<RootTupleInfo> expiredObjects = this.collector.retireExpired(timeout);
        for (RootTupleInfo rootTupleInfo : expiredObjects) {
            this.spoutMetrics.timeoutTuple(rootTupleInfo.getStreamId());
            this.invokeFail(rootTupleInfo.getMessageId(), rootTupleInfo.getStreamId(), timeout);
        }
        Runnable lookForTimeoutsTask = new Runnable(){

            @Override
            public void run() {
                SpoutInstance.this.lookForTimeouts();
            }
        };
        this.looper.registerTimerEvent(timeout.dividedBy(nBucket), lookForTimeoutsTask);
    }

    @Override
    public void readTuplesAndExecute(Communicator<Message> inQueue) {
        long startOfCycle = System.nanoTime();
        Duration spoutAckBatchTime = this.systemConfig.getInstanceAckBatchTime();
        while (!inQueue.isEmpty() && !this.waitingForCheckpointSaved) {
            Message msg = inQueue.poll();
            if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) {
                String checkpintId = ((CheckpointManager.InitiateStatefulCheckpoint)msg).getCheckpointId();
                this.persistState(checkpintId);
                continue;
            }
            if (msg instanceof HeronTuples.HeronTupleSet) {
                HeronTuples.HeronTupleSet tuples = (HeronTuples.HeronTupleSet)msg;
                if (tuples.hasData()) {
                    throw new RuntimeException("Spout cannot get incoming data tuples from other components");
                }
                if (tuples.hasControl()) {
                    for (HeronTuples.AckTuple aT : tuples.getControl().getAcksList()) {
                        this.handleAckTuple(aT, true);
                    }
                    for (HeronTuples.AckTuple aT : tuples.getControl().getFailsList()) {
                        this.handleAckTuple(aT, false);
                    }
                }
                if (System.nanoTime() - startOfCycle - spoutAckBatchTime.toNanos() <= 0L) continue;
                break;
            }
            throw new RuntimeException("Invalid data sent to spout instance");
        }
    }

    private void doImmediateAcks() {
        int s = this.collector.getImmediateAcks().size();
        for (int i = 0; i < s; ++i) {
            RootTupleInfo tupleInfo = this.collector.getImmediateAcks().poll();
            this.invokeAck(tupleInfo.getMessageId(), tupleInfo.getStreamId(), Duration.ZERO);
        }
    }

    private void invokeAck(Object messageId, String streamId, Duration completeLatency) {
        this.spout.ack(messageId);
        this.helper.getTopologyContext().invokeHookSpoutAck(messageId, completeLatency);
        this.spoutMetrics.ackedTuple(streamId, completeLatency.toNanos());
    }

    private void invokeFail(Object messageId, String streamId, Duration failLatency) {
        this.spout.fail(messageId);
        this.helper.getTopologyContext().invokeHookSpoutFail(messageId, failLatency);
        this.spoutMetrics.failedTuple(streamId, failLatency.toNanos());
    }
}

