/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.network;

import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.NIOLooper;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.network.HeronClient;
import org.apache.heron.common.network.HeronSocketOptions;
import org.apache.heron.common.network.StatusCode;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.instance.InstanceControlMsg;
import org.apache.heron.metrics.GatewayMetrics;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.stmgr.StreamManager;
import org.apache.heron.proto.system.Common;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.shaded.com.google.protobuf.ByteString;
import org.apache.heron.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.heron.shaded.com.google.protobuf.Message;

public class StreamManagerClient
extends HeronClient {
    private static final Logger LOG = Logger.getLogger(StreamManagerClient.class.getName());
    private final String topologyName;
    private final String topologyId;
    private final PhysicalPlans.Instance instance;
    private final Communicator<Message> inStreamQueue;
    private final Communicator<Message> outStreamQueue;
    private final Communicator<InstanceControlMsg> inControlQueue;
    private final GatewayMetrics gatewayMetrics;
    private final SystemConfig systemConfig;
    private PhysicalPlanHelper helper;
    private long lastNotConnectedLogTime = 0L;

    public StreamManagerClient(NIOLooper s, String streamManagerHost, int streamManagerPort, String topologyName, String topologyId, PhysicalPlans.Instance instance, Communicator<Message> inStreamQueue, Communicator<Message> outStreamQueue, Communicator<InstanceControlMsg> inControlQueue, HeronSocketOptions options, GatewayMetrics gatewayMetrics) {
        super(s, streamManagerHost, streamManagerPort, options);
        this.topologyName = topologyName;
        this.topologyId = topologyId;
        this.instance = instance;
        this.inStreamQueue = inStreamQueue;
        this.outStreamQueue = outStreamQueue;
        this.inControlQueue = inControlQueue;
        this.systemConfig = (SystemConfig)SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.gatewayMetrics = gatewayMetrics;
        this.addStreamManagerClientTasksOnWakeUp();
    }

    private void addStreamManagerClientTasksOnWakeUp() {
        Runnable task = new Runnable(){

            @Override
            public void run() {
                StreamManagerClient.this.sendStreamMessageIfNeeded();
                StreamManagerClient.this.readStreamMessageIfNeeded();
            }
        };
        this.getNIOLooper().addTasksOnWakeup(task);
    }

    private void registerMessagesToHandle() {
        this.registerOnMessage(StreamManager.NewInstanceAssignmentMessage.newBuilder());
        this.registerOnMessage(HeronTuples.HeronTupleSet2.newBuilder());
        this.registerOnMessage(CheckpointManager.InitiateStatefulCheckpoint.newBuilder());
        this.registerOnMessage(CheckpointManager.RestoreInstanceStateRequest.newBuilder());
        this.registerOnMessage(CheckpointManager.StartInstanceStatefulProcessing.newBuilder());
    }

    @Override
    public void onError() {
        LOG.severe("Disconnected from Stream Manager.");
        LOG.info("Clean the old PhysicalPlanHelper in StreamManagerClient.");
        this.helper = null;
        this.onConnect(StatusCode.CONNECT_ERROR);
    }

    @Override
    public void onConnect(StatusCode status) {
        if (status != StatusCode.OK) {
            LOG.log(Level.WARNING, "Error connecting to Stream Manager with status: {0}, Retrying...", (Object)status);
            Runnable r = new Runnable(){

                @Override
                public void run() {
                    StreamManagerClient.this.start();
                }
            };
            this.getNIOLooper().registerTimerEvent(this.systemConfig.getInstanceReconnectStreammgrInterval(), r);
            return;
        }
        this.registerMessagesToHandle();
        LOG.info("Connected to Stream Manager. Ready to send register request");
        this.sendRegisterRequest();
    }

    private void sendRegisterRequest() {
        StreamManager.RegisterInstanceRequest request = StreamManager.RegisterInstanceRequest.newBuilder().setInstance(this.instance).setTopologyName(this.topologyName).setTopologyId(this.topologyId).build();
        this.sendRequest(request, null, StreamManager.RegisterInstanceResponse.newBuilder(), this.systemConfig.getInstanceReconnectStreammgrInterval());
    }

    @Override
    public void onResponse(StatusCode status, Object ctx, Message response) {
        if (status != StatusCode.OK) {
            throw new RuntimeException("Response from Stream Manager not ok");
        }
        if (!(response instanceof StreamManager.RegisterInstanceResponse)) {
            throw new RuntimeException("Unknown kind of response received from Stream Manager");
        }
        this.handleRegisterResponse((StreamManager.RegisterInstanceResponse)response);
    }

    @Override
    public void onIncomingMessage(Message message) {
        this.gatewayMetrics.updateReceivedPacketsCount(1L);
        this.gatewayMetrics.updateReceivedPacketsSize(message.getSerializedSize());
        if (message instanceof StreamManager.NewInstanceAssignmentMessage) {
            StreamManager.NewInstanceAssignmentMessage m = (StreamManager.NewInstanceAssignmentMessage)message;
            LOG.info("Handling assignment message from direct NewInstanceAssignmentMessage");
            this.handleAssignmentMessage(m.getPplan());
        } else if (message instanceof HeronTuples.HeronTupleSet2) {
            this.handleNewTuples2((HeronTuples.HeronTupleSet2)message);
        } else if (message instanceof CheckpointManager.InitiateStatefulCheckpoint) {
            this.handleCheckpointRequest((CheckpointManager.InitiateStatefulCheckpoint)message);
        } else if (message instanceof CheckpointManager.RestoreInstanceStateRequest) {
            this.handleRestoreInstanceStateRequest((CheckpointManager.RestoreInstanceStateRequest)message);
        } else if (message instanceof CheckpointManager.StartInstanceStatefulProcessing) {
            this.handleStartStatefulRequest((CheckpointManager.StartInstanceStatefulProcessing)message);
        } else {
            throw new RuntimeException("Unknown kind of message received from Stream Manager");
        }
    }

    @Override
    public void onClose() {
        LOG.info("StreamManagerClient exits.");
    }

    public void sendAllMessage() {
        if (!this.isConnected()) {
            return;
        }
        LOG.info("Flushing all pending data in StreamManagerClient");
        int size = this.outStreamQueue.size();
        for (int i = 0; i < size; ++i) {
            Message streamMessage = this.outStreamQueue.poll();
            this.sendMessage(streamMessage);
        }
    }

    private void sendStreamMessageIfNeeded() {
        if (this.isStreamMgrReadyReceiveTuples()) {
            if (this.getOutstandingPackets() <= 0) {
                while (!this.outStreamQueue.isEmpty()) {
                    Message tupleSet = this.outStreamQueue.poll();
                    this.gatewayMetrics.updateSentPacketsCount(1L);
                    this.gatewayMetrics.updateSentPacketsSize(tupleSet.getSerializedSize());
                    this.sendMessage(tupleSet);
                }
            }
            if (!this.outStreamQueue.isEmpty()) {
                this.startWriting();
            }
        } else {
            LOG.info("Stop writing due to not yet connected to Stream Manager.");
        }
    }

    private void readStreamMessageIfNeeded() {
        long lastNotConnectedLogThrottleSeconds = 5L;
        if (this.isConnected()) {
            if (this.isInQueuesAvailable() || this.helper == null) {
                this.startReading();
            } else {
                this.gatewayMetrics.updateInQueueFullCount();
                this.stopReading();
            }
        } else {
            long now = System.currentTimeMillis();
            if (now - this.lastNotConnectedLogTime > 5000L) {
                LOG.info(String.format("Stop reading due to not yet connected to Stream Manager. This message is throttled to emit no more than once every %d seconds.", 5L));
                this.lastNotConnectedLogTime = now;
            }
        }
    }

    private void handleStartStatefulRequest(CheckpointManager.StartInstanceStatefulProcessing request) {
        LOG.info("Received a StartInstanceStatefulProcessing request: " + request);
        InstanceControlMsg instanceControlMsg = InstanceControlMsg.newBuilder().setStartInstanceStatefulProcessing(request).build();
        this.inControlQueue.offer(instanceControlMsg);
    }

    private void handleRestoreInstanceStateRequest(CheckpointManager.RestoreInstanceStateRequest request) {
        LOG.info("Received a RestoreInstanceState request with checkpoint id: " + request.getState().getCheckpointId());
        InstanceControlMsg instanceControlMsg = InstanceControlMsg.newBuilder().setRestoreInstanceStateRequest(request).build();
        this.inControlQueue.offer(instanceControlMsg);
    }

    private void handleCheckpointRequest(CheckpointManager.InitiateStatefulCheckpoint request) {
        LOG.info("Handling instance checkpoint request: " + request);
        this.inStreamQueue.offer(request);
    }

    private void handleRegisterResponse(StreamManager.RegisterInstanceResponse response) {
        if (response.getStatus().getStatus() != Common.StatusCode.OK) {
            throw new RuntimeException("Stream Manager returned a not ok response for register");
        }
        LOG.info("We registered ourselves to the Stream Manager");
        if (response.hasPplan()) {
            LOG.info("Handling assignment message from response");
            this.handleAssignmentMessage(response.getPplan());
        }
    }

    private void handleNewTuples2(HeronTuples.HeronTupleSet2 set) {
        HeronTuples.HeronTupleSet.Builder toFeed = HeronTuples.HeronTupleSet.newBuilder();
        toFeed.setSrcTaskId(set.getSrcTaskId());
        if (set.hasControl()) {
            toFeed.setControl(set.getControl());
        } else {
            HeronTuples.HeronDataTupleSet.Builder builder = HeronTuples.HeronDataTupleSet.newBuilder();
            builder.setStream(set.getData().getStream());
            try {
                for (ByteString bs : set.getData().getTuplesList()) {
                    builder.addTuples(HeronTuples.HeronDataTuple.parseFrom(bs));
                }
            }
            catch (InvalidProtocolBufferException e) {
                LOG.log(Level.SEVERE, "Failed to parse protobuf", e);
            }
            toFeed.setData(builder);
        }
        HeronTuples.HeronTupleSet s = toFeed.build();
        this.inStreamQueue.offer(s);
    }

    private void handleAssignmentMessage(PhysicalPlans.PhysicalPlan pplan) {
        LOG.fine("Physical Plan: " + pplan);
        PhysicalPlanHelper newHelper = new PhysicalPlanHelper(pplan, this.instance.getInstanceId());
        if (!(this.helper == null || this.helper.getMyComponent().equals(newHelper.getMyComponent()) && this.helper.getMyTaskId() == newHelper.getMyTaskId())) {
            throw new RuntimeException("Our Assignment has changed. We will die to pick it");
        }
        if (this.helper == null) {
            LOG.info("We received a new Physical Plan.");
        } else {
            LOG.info("We received a new Physical Plan with same assignment. Should be state changes.");
            LOG.info(String.format("Old state: %s; new sate: %s.", this.helper.getTopologyState(), newHelper.getTopologyState()));
        }
        this.helper = newHelper;
        LOG.info("Push to Slave");
        InstanceControlMsg instanceControlMsg = InstanceControlMsg.newBuilder().setNewPhysicalPlanHelper(this.helper).build();
        this.inControlQueue.offer(instanceControlMsg);
    }

    private boolean isStreamMgrReadyReceiveTuples() {
        return this.isConnected() && this.helper != null;
    }

    private boolean isInQueuesAvailable() {
        return this.inStreamQueue.size() < this.inStreamQueue.getExpectedAvailableCapacity();
    }
}

