/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.instance;

import java.io.Serializable;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.state.HashMapState;
import org.apache.heron.api.state.State;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SlaveLooper;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.metrics.MetricsCollector;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
import org.apache.heron.instance.IInstance;
import org.apache.heron.instance.InstanceControlMsg;
import org.apache.heron.instance.bolt.BoltInstance;
import org.apache.heron.instance.spout.SpoutInstance;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.Common;
import org.apache.heron.proto.system.Metrics;
import org.apache.heron.shaded.com.google.protobuf.Message;

public class Slave
implements Runnable,
AutoCloseable {
    private static final Logger LOG = Logger.getLogger(Slave.class.getName());
    private final SlaveLooper slaveLooper;
    private MetricsCollector metricsCollector;
    private final Communicator<Message> streamInCommunicator;
    private final Communicator<Message> streamOutCommunicator;
    private final Communicator<InstanceControlMsg> inControlQueue;
    private final Communicator<Metrics.MetricPublisherPublishMessage> metricsOutCommunicator;
    private IPluggableSerializer serializer;
    private IInstance instance;
    private PhysicalPlanHelper helper;
    private SystemConfig systemConfig;
    private boolean isInstanceStarted;
    private State<Serializable, Serializable> instanceState;
    private boolean isStatefulProcessingStarted;

    public Slave(SlaveLooper slaveLooper, Communicator<Message> streamInCommunicator, Communicator<Message> streamOutCommunicator, Communicator<InstanceControlMsg> inControlQueue, Communicator<Metrics.MetricPublisherPublishMessage> metricsOutCommunicator) {
        this.slaveLooper = slaveLooper;
        this.streamInCommunicator = streamInCommunicator;
        this.streamOutCommunicator = streamOutCommunicator;
        this.inControlQueue = inControlQueue;
        this.metricsOutCommunicator = metricsOutCommunicator;
        this.isInstanceStarted = false;
        this.instanceState = null;
        this.isStatefulProcessingStarted = false;
        this.systemConfig = (SystemConfig)SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.metricsCollector = new MetricsCollector(slaveLooper, metricsOutCommunicator);
        this.handleControlMessage();
    }

    private void handleControlMessage() {
        Runnable handleControlMessageTask = new Runnable(){

            @Override
            public void run() {
                while (!Slave.this.inControlQueue.isEmpty()) {
                    InstanceControlMsg instanceControlMsg = (InstanceControlMsg)Slave.this.inControlQueue.poll();
                    if (instanceControlMsg.isStartInstanceStatefulProcessing()) {
                        Slave.this.handleStartInstanceStatefulProcessing(instanceControlMsg);
                    }
                    if (instanceControlMsg.isRestoreInstanceStateRequest()) {
                        Slave.this.handleRestoreInstanceStateRequest(instanceControlMsg);
                    }
                    if (!instanceControlMsg.isNewPhysicalPlanHelper()) continue;
                    Slave.this.handleNewPhysicalPlan(instanceControlMsg);
                }
            }
        };
        this.slaveLooper.addTasksOnWakeup(handleControlMessageTask);
    }

    private void resetCurrentAssignment() {
        this.helper.setTopologyContext(this.metricsCollector);
        this.instance = this.helper.getMySpout() != null ? new SpoutInstance(this.helper, this.streamInCommunicator, this.streamOutCommunicator, this.slaveLooper) : new BoltInstance(this.helper, this.streamInCommunicator, this.streamOutCommunicator, this.slaveLooper);
        this.startInstanceIfNeeded();
    }

    private void handleNewAssignment() {
        LOG.log(Level.INFO, "Incarnating ourselves as {0} with task id {1}", new Object[]{this.helper.getMyComponent(), this.helper.getMyTaskId()});
        this.serializer = SerializeDeSerializeHelper.getSerializer(this.helper.getTopologyContext().getTopologyConfig());
        if (this.helper.getMySpout() != null) {
            this.instance = new SpoutInstance(this.helper, this.streamInCommunicator, this.streamOutCommunicator, this.slaveLooper);
            this.streamInCommunicator.init(this.systemConfig.getInstanceInternalSpoutReadQueueCapacity(), this.systemConfig.getInstanceTuningExpectedSpoutReadQueueSize(), this.systemConfig.getInstanceTuningCurrentSampleWeight());
            this.streamOutCommunicator.init(this.systemConfig.getInstanceInternalSpoutWriteQueueCapacity(), this.systemConfig.getInstanceTuningExpectedSpoutWriteQueueSize(), this.systemConfig.getInstanceTuningCurrentSampleWeight());
        } else {
            this.instance = new BoltInstance(this.helper, this.streamInCommunicator, this.streamOutCommunicator, this.slaveLooper);
            this.streamInCommunicator.init(this.systemConfig.getInstanceInternalBoltReadQueueCapacity(), this.systemConfig.getInstanceTuningExpectedBoltReadQueueSize(), this.systemConfig.getInstanceTuningCurrentSampleWeight());
            this.streamOutCommunicator.init(this.systemConfig.getInstanceInternalBoltWriteQueueCapacity(), this.systemConfig.getInstanceTuningExpectedBoltWriteQueueSize(), this.systemConfig.getInstanceTuningCurrentSampleWeight());
        }
        if (!this.helper.isTopologyRunning()) {
            LOG.info("Instance is deployed in deactivated state");
        }
        this.startInstanceIfNeeded();
    }

    @Override
    public void run() {
        Thread.currentThread().setName("SlaveThread");
        this.slaveLooper.loop();
    }

    private void startInstanceIfNeeded() {
        if (this.helper == null) {
            LOG.info("No physical plan received. Instance is not started");
            return;
        }
        Map<String, Object> topoConf = this.helper.getTopologyContext().getTopologyConfig();
        if (topoConf.containsKey("topology.environment")) {
            Map envProps = (Map)topoConf.get("topology.environment");
            LOG.info("Setting topology environment: " + envProps);
            System.getProperties().putAll((Map<?, ?>)envProps);
        }
        if (this.helper.isTopologyStateful()) {
            if (this.isStatefulProcessingStarted) {
                this.instance.init(this.instanceState);
                if (TopologyAPI.TopologyState.PAUSED.equals(this.helper.getTopologyState())) {
                    this.instance.deactivate();
                }
                this.instance.start();
                this.isInstanceStarted = true;
                LOG.info("Instance is started for stateful topology");
            } else {
                LOG.info("Start signal not received. Instance is not started");
            }
        } else {
            this.instance.init(null);
            if (TopologyAPI.TopologyState.PAUSED.equals(this.helper.getTopologyState())) {
                this.instance.deactivate();
            }
            this.instance.start();
            this.isInstanceStarted = true;
            LOG.info("Instance is started for non-stateful topology");
        }
    }

    @Override
    public void close() {
        LOG.info("Closing the Slave Thread");
        this.metricsCollector.forceGatherAllMetrics();
        LOG.info("Shutting down the instance");
        if (this.instance != null) {
            this.instance.shutdown();
        }
        this.slaveLooper.exitLoop();
        this.streamInCommunicator.clear();
    }

    private void handleStartInstanceStatefulProcessing(InstanceControlMsg instanceControlMsg) {
        CheckpointManager.StartInstanceStatefulProcessing startStatefulRequest = instanceControlMsg.getStartInstanceStatefulProcessing();
        LOG.info("Starting stateful processing with checkpoint id: " + startStatefulRequest.getCheckpointId());
        this.isStatefulProcessingStarted = true;
        this.startInstanceIfNeeded();
    }

    private void cleanAndStopSlave() {
        this.streamInCommunicator.clear();
        this.streamOutCommunicator.clear();
        this.metricsCollector.forceGatherAllMetrics();
        this.slaveLooper.clearTasksOnWakeup();
        this.slaveLooper.clearTimers();
        if (this.instance != null) {
            this.instance.clean();
        }
        this.isStatefulProcessingStarted = false;
    }

    private void registerTasksWithSlave() {
        this.metricsCollector = new MetricsCollector(this.slaveLooper, this.metricsOutCommunicator);
        this.handleControlMessage();
    }

    private void handleRestoreInstanceStateRequest(InstanceControlMsg instanceControlMsg) {
        CheckpointManager.RestoreInstanceStateRequest request = instanceControlMsg.getRestoreInstanceStateRequest();
        if (this.isInstanceStarted) {
            this.cleanAndStopSlave();
        }
        LOG.info("Restoring state to checkpoint id: " + request.getState().getCheckpointId());
        if (this.instanceState != null) {
            this.instanceState.clear();
            this.instanceState = null;
        }
        if (request.getState().hasState() && !request.getState().getState().isEmpty()) {
            State stateToRestore;
            this.instanceState = stateToRestore = (State)this.serializer.deserialize(request.getState().getState().toByteArray());
        } else if (request.getState().hasStateLocation()) {
            State stateToRestore;
            String stateLocation = request.getState().getStateLocation();
            byte[] rawState = FileUtils.readFromFile(stateLocation);
            this.instanceState = stateToRestore = (State)this.serializer.deserialize(rawState);
        } else {
            LOG.info("The restore request does not have an actual state");
        }
        if (this.instanceState == null) {
            this.instanceState = new HashMapState<Serializable, Serializable>();
        }
        LOG.info("Instance state restored for checkpoint id: " + request.getState().getCheckpointId());
        if (this.isInstanceStarted && this.helper != null) {
            LOG.info("Restarting instance");
            this.resetCurrentAssignment();
        }
        this.registerTasksWithSlave();
        CheckpointManager.RestoreInstanceStateResponse response = CheckpointManager.RestoreInstanceStateResponse.newBuilder().setCheckpointId(request.getState().getCheckpointId()).setStatus(Common.Status.newBuilder().setStatus(Common.StatusCode.OK).build()).build();
        this.streamOutCommunicator.offer(response);
    }

    private void handleNewPhysicalPlan(InstanceControlMsg instanceControlMsg) {
        block6: {
            TopologyAPI.TopologyState oldTopologyState;
            block7: {
                PhysicalPlanHelper newHelper;
                block5: {
                    newHelper = instanceControlMsg.getNewPhysicalPlanHelper();
                    newHelper.setTopologyContext(this.metricsCollector);
                    if (this.helper != null) break block5;
                    this.helper = newHelper;
                    this.handleNewAssignment();
                    break block6;
                }
                oldTopologyState = this.helper.getTopologyState();
                this.helper = newHelper;
                this.instance.update(this.helper);
                if (oldTopologyState.equals(this.helper.getTopologyState())) break block7;
                switch (this.helper.getTopologyState()) {
                    case RUNNING: {
                        if (!this.isInstanceStarted) {
                            this.startInstanceIfNeeded();
                        }
                        this.instance.activate();
                        break block6;
                    }
                    case PAUSED: {
                        this.instance.deactivate();
                        break block6;
                    }
                    default: {
                        throw new RuntimeException("Unexpected TopologyState is updated for spout: " + this.helper.getTopologyState());
                    }
                }
            }
            LOG.info("Topology state remains the same in Slave: " + oldTopologyState);
        }
    }
}

