package org.apache.heron.network;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.NIOLooper;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.network.HeronClient;
import org.apache.heron.common.network.HeronSocketOptions;
import org.apache.heron.common.network.StatusCode;
import org.apache.heron.metrics.GatewayMetrics;
import org.apache.heron.proto.system.Common;
import org.apache.heron.proto.system.Metrics;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.shaded.com.google.protobuf.Message;

/* loaded from: input_file:org/apache/heron/network/MetricsManagerClient.class */
public class MetricsManagerClient extends HeronClient {
    private static final Logger LOG = Logger.getLogger(MetricsManagerClient.class.getName());
    private final PhysicalPlans.Instance instance;
    private final List<Communicator<Metrics.MetricPublisherPublishMessage>> outMetricsQueues;
    private final SystemConfig systemConfig;
    private final GatewayMetrics gatewayMetrics;
    private String hostname;

    public MetricsManagerClient(NIOLooper nIOLooper, String str, int i, PhysicalPlans.Instance instance, List<Communicator<Metrics.MetricPublisherPublishMessage>> list, HeronSocketOptions heronSocketOptions, GatewayMetrics gatewayMetrics) {
        super(nIOLooper, str, i, heronSocketOptions);
        this.instance = instance;
        this.outMetricsQueues = list;
        this.systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.gatewayMetrics = gatewayMetrics;
        addMetricsManagerClientTasksOnWakeUp();
        try {
            this.hostname = InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            throw new RuntimeException("GetHostName failed");
        }
    }

    private void addMetricsManagerClientTasksOnWakeUp() {
        getNIOLooper().addTasksOnWakeup(new Runnable() { // from class: org.apache.heron.network.MetricsManagerClient.1
            @Override // java.lang.Runnable
            public void run() {
                MetricsManagerClient.this.sendMetricsMessageIfNeeded();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendMetricsMessageIfNeeded() {
        if (isConnected()) {
            for (Communicator<Metrics.MetricPublisherPublishMessage> communicator : this.outMetricsQueues) {
                while (!communicator.isEmpty()) {
                    Metrics.MetricPublisherPublishMessage poll = communicator.poll();
                    this.gatewayMetrics.updateSentMetricsSize(poll.getSerializedSize());
                    this.gatewayMetrics.updateSentMetrics(poll.getMetricsCount(), poll.getExceptionsCount());
                    sendMessage(poll);
                }
            }
        }
    }

    public void sendAllMessage() {
        if (isConnected()) {
            LOG.info("Flushing all pending data in MetricsManagerClient");
            for (Communicator<Metrics.MetricPublisherPublishMessage> communicator : this.outMetricsQueues) {
                int size = communicator.size();
                for (int i = 0; i < size; i++) {
                    sendMessage(communicator.poll());
                }
            }
        }
    }

    @Override // org.apache.heron.common.network.HeronClient
    public void onError() {
        LOG.severe("Disconnected from Metrics Manager.");
        onConnect(StatusCode.CONNECT_ERROR);
    }

    @Override // org.apache.heron.common.network.HeronClient
    public void onConnect(StatusCode statusCode) {
        if (statusCode == StatusCode.OK) {
            LOG.info("Connected to Metrics Manager. Ready to send register request");
            sendRegisterRequest();
        } else {
            LOG.log(Level.WARNING, "Cannot connect to the metrics port with status: {0}, Will Retry..", statusCode);
            getNIOLooper().registerTimerEvent(this.systemConfig.getInstanceReconnectMetricsmgrInterval(), new Runnable() { // from class: org.apache.heron.network.MetricsManagerClient.2
                @Override // java.lang.Runnable
                public void run() {
                    MetricsManagerClient.this.start();
                }
            });
        }
    }

    private void sendRegisterRequest() {
        sendRequest(Metrics.MetricPublisherRegisterRequest.newBuilder().setPublisher(Metrics.MetricPublisher.newBuilder().setHostname(this.hostname).setPort(this.instance.getInfo().getTaskId()).setComponentName(this.instance.getInfo().getComponentName()).setInstanceId(this.instance.getInstanceId()).setInstanceIndex(this.instance.getInfo().getComponentIndex()).build()).build(), null, Metrics.MetricPublisherRegisterResponse.newBuilder(), this.systemConfig.getInstanceReconnectMetricsmgrInterval());
    }

    @Override // org.apache.heron.common.network.HeronClient
    public void onResponse(StatusCode statusCode, Object obj, Message message) {
        if (statusCode != StatusCode.OK) {
            throw new RuntimeException("Response from Metrics Manager not ok");
        }
        if (!Metrics.MetricPublisherRegisterResponse.class.isInstance(message)) {
            throw new RuntimeException("Unknown kind of response received from Metrics Manager");
        }
        handleRegisterResponse((Metrics.MetricPublisherRegisterResponse) message);
    }

    @Override // org.apache.heron.common.network.HeronClient
    public void onIncomingMessage(Message message) {
        throw new RuntimeException("MetricsClient got an unknown message from Metrics Manager");
    }

    @Override // org.apache.heron.common.network.HeronClient
    public void onClose() {
        LOG.info("MetricsManagerClient exits");
    }

    private void handleRegisterResponse(Metrics.MetricPublisherRegisterResponse metricPublisherRegisterResponse) {
        if (metricPublisherRegisterResponse.getStatus().getStatus() != Common.StatusCode.OK) {
            throw new RuntimeException("Metrics Manager returned a not ok response for register");
        }
        LOG.info("We registered ourselves to the Metrics Manager");
    }
}
