/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.common.api;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PulsarHandler
extends PulsarDecoder {
    protected ChannelHandlerContext ctx;
    protected SocketAddress remoteAddress;
    protected int remoteEndpointProtocolVersion = PulsarApi.ProtocolVersion.v0.getNumber();
    private final long keepAliveIntervalSeconds;
    private boolean waitingForPingResponse = false;
    private ScheduledFuture<?> keepAliveTask;
    private static final Logger log = LoggerFactory.getLogger(PulsarHandler.class);

    public int getRemoteEndpointProtocolVersion() {
        return this.remoteEndpointProtocolVersion;
    }

    public PulsarHandler(int keepAliveInterval, TimeUnit unit) {
        this.keepAliveIntervalSeconds = unit.toSeconds(keepAliveInterval);
    }

    @Override
    protected final void messageReceived() {
        this.waitingForPingResponse = false;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.remoteAddress = ctx.channel().remoteAddress();
        this.ctx = ctx;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Scheduling keep-alive task every {} s", (Object)ctx.channel(), (Object)this.keepAliveIntervalSeconds);
        }
        if (this.keepAliveIntervalSeconds > 0L) {
            this.keepAliveTask = ctx.executor().scheduleAtFixedRate(this::handleKeepAliveTimeout, this.keepAliveIntervalSeconds, this.keepAliveIntervalSeconds, TimeUnit.SECONDS);
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.cancelKeepAliveTask();
    }

    @Override
    protected final void handlePing(PulsarApi.CommandPing ping) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Replying back to ping message", (Object)this.ctx.channel());
        }
        this.ctx.writeAndFlush((Object)Commands.newPong());
    }

    @Override
    protected final void handlePong(PulsarApi.CommandPong pong) {
    }

    private void handleKeepAliveTimeout() {
        if (!this.ctx.channel().isOpen()) {
            return;
        }
        if (!this.isHandshakeCompleted()) {
            log.warn("[{}] Pulsar Handshake was not completed within timeout, closing connection", (Object)this.ctx.channel());
            this.ctx.close();
        } else if (this.waitingForPingResponse && this.ctx.channel().config().isAutoRead()) {
            log.warn("[{}] Forcing connection to close after keep-alive timeout", (Object)this.ctx.channel());
            this.ctx.close();
        } else if (this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v1.getNumber()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Sending ping message", (Object)this.ctx.channel());
            }
            this.waitingForPingResponse = true;
            this.ctx.writeAndFlush((Object)Commands.newPing());
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Peer doesn't support keep-alive", (Object)this.ctx.channel());
        }
    }

    protected void cancelKeepAliveTask() {
        if (this.keepAliveTask != null) {
            this.keepAliveTask.cancel(false);
            this.keepAliveTask = null;
        }
    }

    protected abstract boolean isHandshakeCompleted();
}

