/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.UnaryOperator;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class HandlerBase {
    protected final PulsarClientImpl client;
    protected final String topic;
    private static final AtomicReferenceFieldUpdater<HandlerBase, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, State.class, "state");
    private volatile State state = null;
    private static final AtomicReferenceFieldUpdater<HandlerBase, ClientCnx> CLIENT_CNX_UPDATER = AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, ClientCnx.class, "clientCnx");
    private volatile ClientCnx clientCnx = null;
    protected final Backoff backoff;
    private static final Logger log = LoggerFactory.getLogger(HandlerBase.class);

    public HandlerBase(PulsarClientImpl client, String topic) {
        this.client = client;
        this.topic = topic;
        this.backoff = new Backoff(100L, TimeUnit.MILLISECONDS, 60L, TimeUnit.SECONDS);
        STATE_UPDATER.set(this, State.Uninitialized);
        CLIENT_CNX_UPDATER.set(this, null);
    }

    protected void grabCnx() {
        if (CLIENT_CNX_UPDATER.get(this) != null) {
            log.warn("[{}] [{}] Client cnx already set, ignoring reconnection request", (Object)this.topic, (Object)this.getHandlerName());
            return;
        }
        if (!this.isValidStateForReconnection()) {
            log.info("[{}] [{}] Ignoring reconnection request (state: {})", new Object[]{this.topic, this.getHandlerName(), STATE_UPDATER.get(this)});
            return;
        }
        try {
            ((CompletableFuture)this.client.getConnection(this.topic).thenAccept(this::connectionOpened)).exceptionally(this::handleConnectionError);
        }
        catch (Throwable t) {
            log.warn("[{}] [{}] Exception thrown while getting connection: ", new Object[]{this.topic, this.getHandlerName(), t});
            this.reconnectLater(t);
        }
    }

    private Void handleConnectionError(Throwable exception) {
        log.warn("[{}] [{}] Error connecting to broker: {}", new Object[]{this.topic, this.getHandlerName(), exception.getMessage()});
        this.connectionFailed(new PulsarClientException(exception));
        State state = STATE_UPDATER.get(this);
        if (state == State.Uninitialized || state == State.Connecting || state == State.Ready) {
            this.reconnectLater(exception);
        }
        return null;
    }

    protected void reconnectLater(Throwable exception) {
        CLIENT_CNX_UPDATER.set(this, null);
        if (!this.isValidStateForReconnection()) {
            log.info("[{}] [{}] Ignoring reconnection request (state: {})", new Object[]{this.topic, this.getHandlerName(), STATE_UPDATER.get(this)});
            return;
        }
        long delayMs = this.backoff.next();
        log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try again in {} s", new Object[]{this.topic, this.getHandlerName(), exception.getMessage(), (double)delayMs / 1000.0});
        STATE_UPDATER.set(this, State.Connecting);
        this.client.timer().newTimeout(timeout -> {
            log.info("[{}] [{}] Reconnecting after connection was closed", (Object)this.topic, (Object)this.getHandlerName());
            this.grabCnx();
        }, delayMs, TimeUnit.MILLISECONDS);
    }

    protected void connectionClosed(ClientCnx cnx) {
        if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
            if (!this.isValidStateForReconnection()) {
                log.info("[{}] [{}] Ignoring reconnection request (state: {})", new Object[]{this.topic, this.getHandlerName(), STATE_UPDATER.get(this)});
                return;
            }
            long delayMs = this.backoff.next();
            STATE_UPDATER.set(this, State.Connecting);
            log.info("[{}] [{}] Closed connection {} -- Will try again in {} s", new Object[]{this.topic, this.getHandlerName(), cnx.channel(), (double)delayMs / 1000.0});
            this.client.timer().newTimeout(timeout -> {
                log.warn("[{}] [{}] Reconnecting after timeout", (Object)this.topic, (Object)this.getHandlerName());
                this.grabCnx();
            }, delayMs, TimeUnit.MILLISECONDS);
        }
    }

    protected void resetBackoff() {
        this.backoff.reset();
    }

    protected ClientCnx cnx() {
        return CLIENT_CNX_UPDATER.get(this);
    }

    protected boolean isRetriableError(PulsarClientException e) {
        return e instanceof PulsarClientException.LookupException;
    }

    protected boolean changeToReadyState() {
        return STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Ready) || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Ready);
    }

    protected State getState() {
        return STATE_UPDATER.get(this);
    }

    protected void setState(State s) {
        STATE_UPDATER.set(this, s);
    }

    protected State getAndUpdateState(UnaryOperator<State> updater) {
        return STATE_UPDATER.getAndUpdate(this, updater);
    }

    protected ClientCnx getClientCnx() {
        return CLIENT_CNX_UPDATER.get(this);
    }

    protected void setClientCnx(ClientCnx clientCnx) {
        CLIENT_CNX_UPDATER.set(this, clientCnx);
    }

    private boolean isValidStateForReconnection() {
        State state = STATE_UPDATER.get(this);
        switch (state) {
            case Uninitialized: 
            case Connecting: 
            case Ready: {
                return true;
            }
            case Closing: 
            case Closed: 
            case Failed: 
            case Terminated: {
                return false;
            }
        }
        return false;
    }

    abstract void connectionFailed(PulsarClientException var1);

    abstract void connectionOpened(ClientCnx var1);

    abstract String getHandlerName();

    static enum State {
        Uninitialized,
        Connecting,
        Ready,
        Closing,
        Closed,
        Terminated,
        Failed;

    }
}

