/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.transport.base;

import com.netflix.eureka2.protocol.Heartbeat;
import com.netflix.eureka2.transport.MessageConnection;
import com.netflix.eureka2.utils.ExceptionUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

public class HeartBeatConnection
implements MessageConnection {
    private static final Logger logger = LoggerFactory.getLogger(HeartBeatConnection.class);
    protected static final IllegalStateException MISSING_HEARTBEAT_EXCEPTION = ExceptionUtils.trimStackTraceof(new IllegalStateException("too many heartbeats missed"));
    private final MessageConnection delegate;
    private final long heartbeatIntervalMs;
    private final long tolerance;
    private final Scheduler scheduler;
    private final HeartbeatSenderReceiver heartbeatSenderReceiver;
    private final PublishSubject<Object> filteredInput;
    private final Subscription ackInputSubscription;

    public HeartBeatConnection(MessageConnection delegate, long heartbeatIntervalMs, long tolerance, Scheduler scheduler) {
        this.delegate = delegate;
        this.heartbeatIntervalMs = heartbeatIntervalMs;
        this.tolerance = tolerance;
        this.scheduler = scheduler;
        this.filteredInput = PublishSubject.create();
        this.heartbeatSenderReceiver = new HeartbeatSenderReceiver();
        this.ackInputSubscription = delegate.incoming().filter((Func1)new Func1<Object, Boolean>(){

            public Boolean call(Object o) {
                if (o instanceof Heartbeat) {
                    HeartBeatConnection.this.heartbeatSenderReceiver.onHeartbeatReceived();
                    return false;
                }
                return true;
            }
        }).subscribe(this.filteredInput);
        delegate.lifecycleObservable().subscribe((Subscriber)new Subscriber<Void>(){

            public void onCompleted() {
                HeartBeatConnection.this.internalShutdown();
            }

            public void onError(Throwable e) {
                HeartBeatConnection.this.internalShutdown();
            }

            public void onNext(Void aVoid) {
            }
        });
    }

    @Override
    public String name() {
        return this.delegate.name();
    }

    @Override
    public Observable<Void> submit(Object message) {
        return this.delegate.submit(message);
    }

    @Override
    public Observable<Void> submitWithAck(Object message) {
        return this.delegate.submitWithAck(message);
    }

    @Override
    public Observable<Void> submitWithAck(Object message, long timeout) {
        return this.delegate.submitWithAck(message, timeout);
    }

    @Override
    public Observable<Void> acknowledge() {
        return this.delegate.acknowledge();
    }

    @Override
    public Observable<Object> incoming() {
        return this.filteredInput;
    }

    @Override
    public Observable<Void> onError(Throwable error) {
        return Observable.error((Throwable)error);
    }

    @Override
    public Observable<Void> onCompleted() {
        return Observable.empty();
    }

    @Override
    public void shutdown() {
        this.heartbeatSenderReceiver.unsubscribe();
        this.delegate.shutdown();
    }

    @Override
    public void shutdown(Throwable e) {
        this.internalShutdown();
        this.delegate.shutdown(e);
    }

    private void internalShutdown() {
        this.ackInputSubscription.unsubscribe();
        this.heartbeatSenderReceiver.unsubscribe();
    }

    @Override
    public Observable<Void> lifecycleObservable() {
        return this.delegate.lifecycleObservable();
    }

    class HeartbeatSenderReceiver
    extends Subscriber<Long> {
        private final AtomicInteger missingHeartbeatsCount = new AtomicInteger();

        HeartbeatSenderReceiver() {
            this.add(Observable.interval((long)HeartBeatConnection.this.heartbeatIntervalMs, (TimeUnit)TimeUnit.MILLISECONDS, (Scheduler)HeartBeatConnection.this.scheduler).subscribe((Subscriber)this));
        }

        void onHeartbeatReceived() {
            logger.debug("Received heartbeat message from {}", (Object)HeartBeatConnection.this.delegate.name());
            this.missingHeartbeatsCount.decrementAndGet();
        }

        public void onCompleted() {
            HeartBeatConnection.this.shutdown();
        }

        public void onError(Throwable e) {
            logger.error("Heartbeat receiver subscription got an error. This will close the connection " + HeartBeatConnection.this.delegate.name(), e);
            HeartBeatConnection.this.shutdown(e);
        }

        public void onNext(Long aLong) {
            if ((long)this.missingHeartbeatsCount.incrementAndGet() > HeartBeatConnection.this.tolerance) {
                logger.warn("More than {} heartbeat messages missed; closing the connection {}", (Object)HeartBeatConnection.this.tolerance, (Object)HeartBeatConnection.this.delegate.name());
                HeartBeatConnection.this.shutdown(MISSING_HEARTBEAT_EXCEPTION);
            } else {
                logger.debug("Sending heartbeat message in the connection {}", (Object)HeartBeatConnection.this.delegate.name());
                HeartBeatConnection.this.submit(Heartbeat.INSTANCE).subscribe((Subscriber)new Subscriber<Void>(){

                    public void onCompleted() {
                    }

                    public void onError(Throwable e) {
                        logger.warn("Failed to send heartbeat message; terminating the connection " + HeartBeatConnection.this.delegate.name(), e);
                        HeartBeatConnection.this.shutdown(e);
                    }

                    public void onNext(Void aVoid) {
                    }
                });
            }
        }
    }
}

