/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.channel;

import com.netflix.eureka2.channel.AbstractServiceChannel;
import com.netflix.eureka2.channel.ServiceChannel;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;

public abstract class RetryableServiceChannel<C extends ServiceChannel>
extends AbstractServiceChannel<STATES> {
    private static final Logger logger = LoggerFactory.getLogger(RetryableServiceChannel.class);
    public static final int MAX_EXP_BACK_OFF_MULTIPLIER = 10;
    private final AtomicReference<C> currentChannelRef;
    private AtomicReference<Subscription> delegateLifecycleSubscription;
    private final long retryInitialDelayMs;
    private final long maxRetryDelayMs;
    private final Scheduler.Worker worker;
    private long lastConnectTime;
    private long retryDelay;
    private final Action0 retryAction = new Action0(){

        public void call() {
            RetryableServiceChannel.this.lastConnectTime = RetryableServiceChannel.this.worker.now();
            RetryableServiceChannel.this.retry();
        }
    };

    protected RetryableServiceChannel(C initialDelegate, long retryInitialDelayMs, Scheduler scheduler) {
        super(STATES.Open, null);
        this.currentChannelRef = new AtomicReference<C>(initialDelegate);
        this.delegateLifecycleSubscription = new AtomicReference<Object>(null);
        this.retryInitialDelayMs = retryInitialDelayMs;
        this.maxRetryDelayMs = retryInitialDelayMs * 10L;
        this.worker = scheduler.createWorker();
        this.retryDelay = retryInitialDelayMs;
        this.subscribeToDelegateChannelLifecycle(initialDelegate);
    }

    @Override
    protected void _close() {
        logger.info("Closing the retryable channel: {}", (Object)this.name);
        if (this.state.get() != STATES.Closed) {
            ServiceChannel delegateChannel;
            this.worker.unsubscribe();
            Subscription currentSubscription = this.delegateLifecycleSubscription.get();
            if (currentSubscription != null && !currentSubscription.isUnsubscribed()) {
                currentSubscription.unsubscribe();
            }
            if ((delegateChannel = (ServiceChannel)this.currentChannelRef.get()) != null) {
                delegateChannel.close();
            }
            this.state.compareAndSet(STATES.Open, STATES.Closed);
        }
    }

    protected C currentDelegateChannel() {
        return (C)((ServiceChannel)this.currentChannelRef.get());
    }

    protected Observable<C> currentDelegateChannelObservable() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<C>(){

            public void call(Subscriber<? super C> subscriber) {
                try {
                    subscriber.onNext(RetryableServiceChannel.this.currentChannelRef.get());
                    subscriber.onCompleted();
                }
                catch (Exception e) {
                    subscriber.onError((Throwable)e);
                }
            }
        });
    }

    protected abstract Observable<C> reestablish();

    protected void retry() {
        logger.info("Retrying ...");
        this.reestablish().single().subscribe(new Subscriber<C>(){

            public void onCompleted() {
            }

            public void onError(Throwable e) {
                RetryableServiceChannel.this.scheduleRetry();
            }

            public void onNext(C newDelegateChannel) {
                ServiceChannel oldDelegateChannel = (ServiceChannel)RetryableServiceChannel.this.currentChannelRef.getAndSet(newDelegateChannel);
                RetryableServiceChannel.this.subscribeToDelegateChannelLifecycle(newDelegateChannel);
                if (oldDelegateChannel != null) {
                    oldDelegateChannel.close();
                }
            }
        });
    }

    protected boolean recoverableError(Throwable error) {
        return true;
    }

    protected void scheduleRetry() {
        long closedAfter = this.worker.now() - this.lastConnectTime;
        if (closedAfter >= this.maxRetryDelayMs) {
            this.retryDelay = this.retryInitialDelayMs;
        }
        this.worker.schedule(this.retryAction, this.retryDelay, TimeUnit.MILLISECONDS);
        this.bumpUpRetryDelay();
    }

    protected void subscribeToDelegateChannelLifecycle(C newDelegateChannel) {
        Observable<Void> lifecycleObservable = newDelegateChannel.asLifecycleObservable();
        Subscription oldSubscription = this.delegateLifecycleSubscription.getAndSet(lifecycleObservable.subscribe((Subscriber)new Subscriber<Void>(){

            public void onCompleted() {
                logger.info("Channel closed gracefully and must be reconnected");
                RetryableServiceChannel.this.scheduleRetry();
            }

            public void onError(Throwable e) {
                if (RetryableServiceChannel.this.recoverableError(e)) {
                    logger.info("Channel failure; scheduling the reconnection in " + RetryableServiceChannel.this.retryDelay + "ms", e);
                    RetryableServiceChannel.this.scheduleRetry();
                } else {
                    logger.error("Unrecoverable error; closing the retryable channel");
                    RetryableServiceChannel.this.lifecycle.onError(e);
                    RetryableServiceChannel.this.close();
                }
            }

            public void onNext(Void aVoid) {
            }
        }));
        if (oldSubscription != null && !oldSubscription.isUnsubscribed()) {
            oldSubscription.unsubscribe();
        }
    }

    private void bumpUpRetryDelay() {
        this.retryDelay = Math.min(this.maxRetryDelayMs, this.retryDelay * 2L);
    }

    public static enum STATES {
        Open,
        Closed;

    }
}

