package com.netflix.eureka2.transport.base;

import com.netflix.eureka2.protocol.Heartbeat;
import com.netflix.eureka2.transport.MessageConnection;
import com.netflix.eureka2.utils.ExceptionUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/* loaded from: input_file:com/netflix/eureka2/transport/base/HeartBeatConnection.class */
public class HeartBeatConnection implements MessageConnection {
    private static final Logger logger = LoggerFactory.getLogger(HeartBeatConnection.class);
    protected static final IllegalStateException MISSING_HEARTBEAT_EXCEPTION = (IllegalStateException) ExceptionUtils.trimStackTraceof(new IllegalStateException("too many heartbeats missed"));
    private final MessageConnection delegate;
    private final long heartbeatIntervalMs;
    private final long tolerance;
    private final Scheduler scheduler;
    private final Subscription ackInputSubscription;
    private final PublishSubject<Object> filteredInput = PublishSubject.create();
    private final HeartbeatSenderReceiver heartbeatSenderReceiver = new HeartbeatSenderReceiver();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/netflix/eureka2/transport/base/HeartBeatConnection$HeartbeatSenderReceiver.class */
    public class HeartbeatSenderReceiver extends Subscriber<Long> {
        private final AtomicInteger missingHeartbeatsCount = new AtomicInteger();

        HeartbeatSenderReceiver() {
            add(Observable.interval(HeartBeatConnection.this.heartbeatIntervalMs, TimeUnit.MILLISECONDS, HeartBeatConnection.this.scheduler).subscribe(this));
        }

        void onHeartbeatReceived() {
            HeartBeatConnection.logger.debug("Received heartbeat message from {}", HeartBeatConnection.this.delegate.name());
            this.missingHeartbeatsCount.decrementAndGet();
        }

        public void onCompleted() {
            HeartBeatConnection.this.shutdown();
        }

        public void onError(Throwable th) {
            HeartBeatConnection.logger.error("Heartbeat receiver subscription got an error. This will close the connection " + HeartBeatConnection.this.delegate.name(), th);
            HeartBeatConnection.this.shutdown(th);
        }

        public void onNext(Long l) {
            if (this.missingHeartbeatsCount.incrementAndGet() > HeartBeatConnection.this.tolerance) {
                HeartBeatConnection.logger.warn("More than {} heartbeat messages missed; closing the connection {}", Long.valueOf(HeartBeatConnection.this.tolerance), HeartBeatConnection.this.delegate.name());
                HeartBeatConnection.this.shutdown(HeartBeatConnection.MISSING_HEARTBEAT_EXCEPTION);
            } else {
                HeartBeatConnection.logger.debug("Sending heartbeat message in the connection {}", HeartBeatConnection.this.delegate.name());
                HeartBeatConnection.this.submit(Heartbeat.INSTANCE).subscribe(new Subscriber<Void>() { // from class: com.netflix.eureka2.transport.base.HeartBeatConnection.HeartbeatSenderReceiver.1
                    public void onCompleted() {
                    }

                    public void onError(Throwable th) {
                        HeartBeatConnection.logger.warn("Failed to send heartbeat message; terminating the connection " + HeartBeatConnection.this.delegate.name(), th);
                        HeartBeatConnection.this.shutdown(th);
                    }

                    public void onNext(Void r2) {
                    }
                });
            }
        }
    }

    public HeartBeatConnection(MessageConnection messageConnection, long j, long j2, Scheduler scheduler) {
        this.delegate = messageConnection;
        this.heartbeatIntervalMs = j;
        this.tolerance = j2;
        this.scheduler = scheduler;
        this.ackInputSubscription = messageConnection.incoming().filter(new Func1<Object, Boolean>() { // from class: com.netflix.eureka2.transport.base.HeartBeatConnection.1
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public Boolean m449call(Object obj) {
                if (!(obj instanceof Heartbeat)) {
                    return true;
                }
                HeartBeatConnection.this.heartbeatSenderReceiver.onHeartbeatReceived();
                return false;
            }
        }).subscribe(this.filteredInput);
        messageConnection.lifecycleObservable().subscribe(new Subscriber<Void>() { // from class: com.netflix.eureka2.transport.base.HeartBeatConnection.2
            public void onCompleted() {
                HeartBeatConnection.this.internalShutdown();
            }

            public void onError(Throwable th) {
                HeartBeatConnection.this.internalShutdown();
            }

            public void onNext(Void r2) {
            }
        });
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public String name() {
        return this.delegate.name();
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> submit(Object obj) {
        return this.delegate.submit(obj);
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> submitWithAck(Object obj) {
        return this.delegate.submitWithAck(obj);
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> submitWithAck(Object obj, long j) {
        return this.delegate.submitWithAck(obj, j);
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> acknowledge() {
        return this.delegate.acknowledge();
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Object> incoming() {
        return this.filteredInput;
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> onError(Throwable th) {
        return Observable.error(th);
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> onCompleted() {
        return Observable.empty();
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public void shutdown() {
        this.heartbeatSenderReceiver.unsubscribe();
        this.delegate.shutdown();
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public void shutdown(Throwable th) {
        internalShutdown();
        this.delegate.shutdown(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void internalShutdown() {
        this.ackInputSubscription.unsubscribe();
        this.heartbeatSenderReceiver.unsubscribe();
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> lifecycleObservable() {
        return this.delegate.lifecycleObservable();
    }
}
