/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.instance.bolt;

import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Map;
import java.util.logging.Logger;
import org.apache.heron.api.Config;
import org.apache.heron.api.bolt.IBolt;
import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.metric.GlobalMetrics;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
import org.apache.heron.api.topology.IUpdatable;
import org.apache.heron.api.utils.Utils;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SlaveLooper;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.metrics.FullBoltMetrics;
import org.apache.heron.common.utils.metrics.IBoltMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
import org.apache.heron.common.utils.topology.TopologyContextImpl;
import org.apache.heron.common.utils.tuple.TickTuple;
import org.apache.heron.common.utils.tuple.TupleImpl;
import org.apache.heron.instance.IInstance;
import org.apache.heron.instance.bolt.BoltOutputCollectorImpl;
import org.apache.heron.instance.util.InstanceUtils;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.shaded.com.google.protobuf.Message;

public class BoltInstance
implements IInstance {
    private static final Logger LOG = Logger.getLogger(BoltInstance.class.getName());
    protected PhysicalPlanHelper helper;
    protected final IBolt bolt;
    protected final BoltOutputCollectorImpl collector;
    protected final IPluggableSerializer serializer;
    protected final IBoltMetrics boltMetrics;
    private final Communicator<Message> streamInQueue;
    private final boolean isTopologyStateful;
    private final boolean spillState;
    private final String spillStateLocation;
    private State<Serializable, Serializable> instanceState;
    private final Map<String, Object> config;
    private final SlaveLooper looper;
    private final SystemConfig systemConfig;

    public BoltInstance(PhysicalPlanHelper helper, Communicator<Message> streamInQueue, Communicator<Message> streamOutQueue, SlaveLooper looper) {
        this.helper = helper;
        this.looper = looper;
        this.streamInQueue = streamInQueue;
        this.boltMetrics = new FullBoltMetrics();
        this.boltMetrics.initMultiCountMetrics(helper);
        this.serializer = SerializeDeSerializeHelper.getSerializer(helper.getTopologyContext().getTopologyConfig());
        this.systemConfig = (SystemConfig)SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.config = helper.getTopologyContext().getTopologyConfig();
        this.isTopologyStateful = String.valueOf((Object)Config.TopologyReliabilityMode.EFFECTIVELY_ONCE).equals(this.config.get("topology.reliability.mode"));
        LOG.info("Is this topology stateful: " + this.isTopologyStateful);
        this.spillState = Boolean.parseBoolean((String)this.config.get("topology.stateful.spill.state"));
        this.spillStateLocation = String.format("%s/%s/", String.valueOf(this.config.get("topology.stateful.spill.state.location")), helper.getMyInstanceId());
        if (helper.getMyBolt() == null) {
            throw new RuntimeException("HeronBoltInstance has no bolt in physical plan.");
        }
        if (helper.getMyBolt().getComp().hasSerializedObject()) {
            this.bolt = (IBolt)Utils.deserialize(helper.getMyBolt().getComp().getSerializedObject().toByteArray());
        } else if (helper.getMyBolt().getComp().hasClassName()) {
            try {
                String boltClassName = helper.getMyBolt().getComp().getClassName();
                this.bolt = (IBolt)Class.forName(boltClassName).newInstance();
            }
            catch (ClassNotFoundException ex) {
                throw new RuntimeException(ex + " Bolt class must be in class path.");
            }
            catch (InstantiationException ex) {
                throw new RuntimeException(ex + " Bolt class must be concrete.");
            }
            catch (IllegalAccessException ex) {
                throw new RuntimeException(ex + " Bolt class must have a no-arg constructor.");
            }
        } else {
            throw new RuntimeException("Neither java_object nor java_class_name set for bolt");
        }
        this.collector = new BoltOutputCollectorImpl(this.serializer, helper, streamOutQueue, this.boltMetrics);
    }

    @Override
    public void update(PhysicalPlanHelper physicalPlanHelper) {
        if (this.bolt instanceof IUpdatable) {
            ((IUpdatable)((Object)this.bolt)).update(physicalPlanHelper.getTopologyContext());
        }
        this.collector.updatePhysicalPlanHelper(physicalPlanHelper);
        physicalPlanHelper.prepareForCustomStreamGrouping();
        physicalPlanHelper.getTopologyContext().getTopologyConfig().putAll(this.helper.getTopologyContext().getTopologyConfig());
        this.helper = physicalPlanHelper;
    }

    @Override
    public void persistState(String checkpointId) {
        LOG.info("Persisting state for checkpoint: " + checkpointId);
        if (!this.isTopologyStateful) {
            throw new RuntimeException("Could not save a non-stateful topology's state");
        }
        this.collector.lock.lock();
        try {
            if (this.bolt instanceof IStatefulComponent) {
                ((IStatefulComponent)((Object)this.bolt)).preSave(checkpointId);
            }
            this.collector.sendOutState(this.instanceState, checkpointId, this.spillState, this.spillStateLocation);
        }
        finally {
            this.collector.lock.unlock();
        }
        LOG.info("State persisted for checkpoint: " + checkpointId);
    }

    @Override
    public void init(State<Serializable, Serializable> state) {
        TopologyContextImpl topologyContext = this.helper.getTopologyContext();
        GlobalMetrics.init(topologyContext, this.systemConfig.getHeronMetricsExportInterval());
        this.boltMetrics.registerMetrics(topologyContext);
        if (this.bolt instanceof IStatefulComponent) {
            this.instanceState = state;
            ((IStatefulComponent)((Object)this.bolt)).initState(this.instanceState);
            if (this.spillState) {
                if (FileUtils.isDirectoryExists(this.spillStateLocation)) {
                    FileUtils.cleanDir(this.spillStateLocation);
                } else {
                    FileUtils.createDirectory(this.spillStateLocation);
                }
            }
        }
        this.bolt.prepare(topologyContext.getTopologyConfig(), topologyContext, new OutputCollector(this.collector));
        topologyContext.invokeHookPrepare();
        this.helper.prepareForCustomStreamGrouping();
    }

    @Override
    public void start() {
        this.addBoltTasks();
    }

    @Override
    public void clean() {
        this.helper.getTopologyContext().invokeHookCleanup();
        this.bolt.cleanup();
        this.streamInQueue.clear();
        this.collector.clear();
    }

    @Override
    public void shutdown() {
        this.clean();
        this.looper.exitLoop();
    }

    private void addBoltTasks() {
        Runnable boltTasks = new Runnable(){

            @Override
            public void run() {
                BoltInstance.this.boltMetrics.updateTaskRunCount();
                if (BoltInstance.this.collector.isOutQueuesAvailable()) {
                    BoltInstance.this.boltMetrics.updateExecutionCount();
                    BoltInstance.this.readTuplesAndExecute(BoltInstance.this.streamInQueue);
                    BoltInstance.this.collector.sendOutTuples();
                } else {
                    BoltInstance.this.boltMetrics.updateOutQueueFullCount();
                }
                if (BoltInstance.this.collector.isOutQueuesAvailable() && !BoltInstance.this.streamInQueue.isEmpty()) {
                    BoltInstance.this.boltMetrics.updateContinueWorkCount();
                    BoltInstance.this.looper.wakeUp();
                }
            }
        };
        this.looper.addTasksOnWakeup(boltTasks);
        this.PrepareTickTupleTimer();
        InstanceUtils.prepareTimerEvents(this.looper, this.helper);
    }

    @Override
    public void readTuplesAndExecute(Communicator<Message> inQueue) {
        TopologyContextImpl topologyContext = this.helper.getTopologyContext();
        Duration instanceExecuteBatchTime = this.systemConfig.getInstanceExecuteBatchTime();
        long startOfCycle = System.nanoTime();
        while (!inQueue.isEmpty()) {
            Message msg = inQueue.poll();
            if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) {
                String checkpointId = ((CheckpointManager.InitiateStatefulCheckpoint)msg).getCheckpointId();
                this.persistState(checkpointId);
            }
            if (!(msg instanceof HeronTuples.HeronTupleSet)) continue;
            HeronTuples.HeronTupleSet tuples = (HeronTuples.HeronTupleSet)msg;
            if (tuples.hasControl()) {
                throw new RuntimeException("Bolt cannot get acks/fails from other components");
            }
            TopologyAPI.StreamId stream = tuples.getData().getStream();
            int nValues = topologyContext.getComponentOutputFields(stream.getComponentName(), stream.getId()).size();
            int sourceTaskId = tuples.getSrcTaskId();
            for (HeronTuples.HeronDataTuple dataTuple : tuples.getData().getTuplesList()) {
                long startTime = System.nanoTime();
                ArrayList<Object> values = new ArrayList<Object>(nValues);
                for (int i = 0; i < nValues; ++i) {
                    values.add(this.serializer.deserialize(dataTuple.getValues(i).toByteArray()));
                }
                TupleImpl t = new TupleImpl(topologyContext, stream, dataTuple.getKey(), dataTuple.getRootsList(), values, System.nanoTime(), false, sourceTaskId);
                long deserializedTime = System.nanoTime();
                this.bolt.execute(t);
                long executeLatency = Duration.ofNanos(System.nanoTime()).minusNanos(deserializedTime).toNanos();
                topologyContext.invokeHookBoltExecute(t, Duration.ofNanos(executeLatency));
                this.boltMetrics.deserializeDataTuple(stream.getId(), stream.getComponentName(), deserializedTime - startTime);
                this.boltMetrics.executeTuple(stream.getId(), stream.getComponentName(), executeLatency);
            }
            long currentTime = System.nanoTime();
            if (currentTime - startOfCycle - instanceExecuteBatchTime.toNanos() <= 0L) continue;
            break;
        }
    }

    @Override
    public void activate() {
    }

    @Override
    public void deactivate() {
    }

    private void PrepareTickTupleTimer() {
        Object tickTupleFreqMs = this.helper.getTopologyContext().getTopologyConfig().get("topology.tick.tuple.freq.ms");
        if (tickTupleFreqMs != null) {
            Duration freq = TypeUtils.getDuration(tickTupleFreqMs, ChronoUnit.MILLIS);
            Runnable r = () -> this.SendTickTuple();
            this.looper.registerPeriodicEvent(freq, r);
        }
    }

    private void SendTickTuple() {
        TickTuple t = new TickTuple();
        long startTime = System.nanoTime();
        this.bolt.execute(t);
        long latency = System.nanoTime() - startTime;
        this.boltMetrics.executeTuple(t.getSourceStreamId(), t.getSourceComponent(), latency);
        this.collector.sendOutTuples();
    }
}

