package com.netflix.eureka2.transport.base;

import com.netflix.eureka2.metric.MessageConnectionMetrics;
import com.netflix.eureka2.transport.Acknowledgement;
import com.netflix.eureka2.transport.MessageConnection;
import io.reactivex.netty.channel.ObservableConnection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/* loaded from: input_file:com/netflix/eureka2/transport/base/BaseMessageConnection.class */
public class BaseMessageConnection implements MessageConnection {
    private static final Logger logger = LoggerFactory.getLogger(BaseMessageConnection.class);
    private static final Pattern NETTY_CHANNEL_NAME_RE = Pattern.compile("\\[.*=>\\s*(.*)\\]");
    private final String name;
    private final ObservableConnection<Object, Object> connection;
    private final MessageConnectionMetrics metrics;
    private final Scheduler.Worker schedulerWorker;
    private final long startTime;
    private final AtomicBoolean closed;
    private final Subject<Void, Void> lifecycleSubject;
    private final Queue<PendingAck> pendingAckQueue;
    private final Action0 ackTimeoutTask;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/netflix/eureka2/transport/base/BaseMessageConnection$PendingAck.class */
    public static class PendingAck extends Subject<Void, Void> {
        private final long expiryTime;
        private final Subject<Void, Void> ackSubject;
        private final AtomicBoolean isCompleted;

        PendingAck(Observable.OnSubscribe<Void> onSubscribe, Subject<Void, Void> subject, long j) {
            super(onSubscribe);
            this.isCompleted = new AtomicBoolean();
            this.ackSubject = subject;
            this.expiryTime = j;
        }

        public long getExpiryTime() {
            return this.expiryTime;
        }

        public boolean hasObservers() {
            return this.ackSubject.hasObservers();
        }

        public void onCompleted() {
            if (this.isCompleted.compareAndSet(false, true)) {
                this.ackSubject.onCompleted();
            }
        }

        public void onError(Throwable th) {
            if (this.isCompleted.compareAndSet(false, true)) {
                this.ackSubject.onError(th);
            }
        }

        public void onNext(Void r2) {
        }

        static PendingAck create(long j) {
            final AsyncSubject create = AsyncSubject.create();
            return new PendingAck(new Observable.OnSubscribe<Void>() { // from class: com.netflix.eureka2.transport.base.BaseMessageConnection.PendingAck.1
                public void call(Subscriber<? super Void> subscriber) {
                    create.subscribe(subscriber);
                }
            }, create, j);
        }
    }

    public BaseMessageConnection(String str, ObservableConnection<Object, Object> observableConnection, MessageConnectionMetrics messageConnectionMetrics) {
        this(str, observableConnection, messageConnectionMetrics, Schedulers.computation());
    }

    public BaseMessageConnection(String str, ObservableConnection<Object, Object> observableConnection, MessageConnectionMetrics messageConnectionMetrics, Scheduler scheduler) {
        this.closed = new AtomicBoolean();
        this.lifecycleSubject = new SerializedSubject(AsyncSubject.create());
        this.pendingAckQueue = new ConcurrentLinkedQueue();
        this.ackTimeoutTask = new Action0() { // from class: com.netflix.eureka2.transport.base.BaseMessageConnection.1
            public void call() {
                try {
                    long now = BaseMessageConnection.this.schedulerWorker.now();
                    PendingAck pendingAck = (PendingAck) BaseMessageConnection.this.pendingAckQueue.peek();
                    if (pendingAck == null || pendingAck.getExpiryTime() > now) {
                        BaseMessageConnection.this.schedulerWorker.schedule(BaseMessageConnection.this.ackTimeoutTask, 1L, TimeUnit.SECONDS);
                    } else {
                        PendingAck pendingAck2 = (PendingAck) BaseMessageConnection.this.pendingAckQueue.peek();
                        TimeoutException timeoutException = new TimeoutException("{connection=" + BaseMessageConnection.this.name + "}: acknowledgement timeout");
                        if (pendingAck2 != null) {
                            pendingAck2.onError(timeoutException);
                        }
                        BaseMessageConnection.this.doShutdown(timeoutException);
                    }
                } catch (Exception e) {
                    BaseMessageConnection.logger.error("Acknowledgement cleanup task failed with an exception", e);
                    BaseMessageConnection.this.doShutdown(e);
                }
            }
        };
        this.connection = observableConnection;
        this.metrics = messageConnectionMetrics;
        this.name = descriptiveName(str);
        this.schedulerWorker = scheduler.createWorker();
        installAcknowledgementHandler();
        this.startTime = scheduler.now();
        messageConnectionMetrics.incrementConnectionCounter();
    }

    private String descriptiveName(String str) {
        String obj = this.connection.getChannel().toString();
        Matcher matcher = NETTY_CHANNEL_NAME_RE.matcher(obj);
        if (matcher.matches()) {
            obj = matcher.group(1);
        }
        return str + "=>" + obj;
    }

    private void installAcknowledgementHandler() {
        this.connection.getInput().ofType(Acknowledgement.class).subscribe(new Action1<Acknowledgement>() { // from class: com.netflix.eureka2.transport.base.BaseMessageConnection.2
            public void call(Acknowledgement acknowledgement) {
                PendingAck pendingAck = (PendingAck) BaseMessageConnection.this.pendingAckQueue.poll();
                BaseMessageConnection.this.metrics.decrementPendingAckCounter();
                if (pendingAck == null) {
                    BaseMessageConnection.this.shutdown(new IllegalStateException("{connection=" + BaseMessageConnection.this.name + "}: unexpected acknowledgment"));
                } else {
                    pendingAck.ackSubject.onCompleted();
                }
            }
        });
        this.schedulerWorker.schedule(this.ackTimeoutTask, 1L, TimeUnit.SECONDS);
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public String name() {
        return this.name;
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> submit(Object obj) {
        return this.closed.get() ? Observable.error(new IllegalStateException("{connection=" + this.name + "}: connection closed")) : writeWhenSubscribed(obj);
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> submitWithAck(Object obj) {
        return submitWithAck(obj, 0L);
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> submitWithAck(Object obj, long j) {
        if (this.closed.get()) {
            return Observable.error(new IllegalStateException("{connection=" + this.name + "}: connection closed"));
        }
        return writeWhenSubscribed(obj, PendingAck.create(j <= 0 ? Long.MAX_VALUE : this.schedulerWorker.now() + j));
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> acknowledge() {
        return this.closed.get() ? Observable.error(new IllegalStateException("{connection=" + this.name + "}: connection closed")) : writeWhenSubscribed(Acknowledgement.INSTANCE);
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Object> incoming() {
        return this.connection.getInput().filter(new Func1<Object, Boolean>() { // from class: com.netflix.eureka2.transport.base.BaseMessageConnection.5
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public Boolean m67call(Object obj) {
                return Boolean.valueOf(!(obj instanceof Acknowledgement));
            }
        }).doOnNext(new Action1<Object>() { // from class: com.netflix.eureka2.transport.base.BaseMessageConnection.4
            public void call(Object obj) {
                BaseMessageConnection.this.metrics.incrementIncomingMessageCounter(obj.getClass(), 1);
            }
        }).doOnTerminate(new Action0() { // from class: com.netflix.eureka2.transport.base.BaseMessageConnection.3
            public void call() {
                BaseMessageConnection.this.shutdown(new IllegalStateException("{connection=" + BaseMessageConnection.this.name + "}: connection input onCompleted"));
            }
        });
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> onError(Throwable th) {
        return Observable.error(th);
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> onCompleted() {
        return Observable.empty();
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public void shutdown() {
        doShutdown(null);
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public void shutdown(Throwable th) {
        doShutdown(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doShutdown(Throwable th) {
        if (this.closed.getAndSet(true)) {
            return;
        }
        if (th == null) {
            logger.info("Shutting down connection {}", this.name);
        } else {
            logger.info("Shutting down connection {} because of an exception {}:{}", new Object[]{this.name, th.getClass().getName(), th.getMessage()});
        }
        drainPendingAckQueue();
        this.metrics.decrementConnectionCounter();
        this.metrics.connectionDuration(this.startTime);
        terminateLifecycle(th);
        this.connection.close().subscribe(new Subscriber<Void>() { // from class: com.netflix.eureka2.transport.base.BaseMessageConnection.6
            public void onCompleted() {
            }

            public void onError(Throwable th2) {
                BaseMessageConnection.logger.debug("Error during closing the connection", th2);
            }

            public void onNext(Void r2) {
            }
        });
        this.schedulerWorker.unsubscribe();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void drainPendingAckQueue() {
        while (true) {
            PendingAck poll = this.pendingAckQueue.poll();
            if (poll == null) {
                return;
            }
            this.metrics.decrementPendingAckCounter();
            try {
                poll.onCompleted();
            } catch (Exception e) {
                logger.warn("Acknowledgement subscriber hasn't handled properly onError", e);
            }
        }
    }

    private void terminateLifecycle(Throwable th) {
        if (th == null) {
            this.lifecycleSubject.onCompleted();
        } else {
            this.lifecycleSubject.onError(th);
        }
    }

    @Override // com.netflix.eureka2.transport.MessageConnection
    public Observable<Void> lifecycleObservable() {
        return this.lifecycleSubject;
    }

    private Observable<Void> writeWhenSubscribed(final Object obj) {
        return this.connection.writeAndFlush(obj).doOnCompleted(new Action0() { // from class: com.netflix.eureka2.transport.base.BaseMessageConnection.7
            public void call() {
                BaseMessageConnection.this.metrics.incrementOutgoingMessageCounter(obj.getClass(), 1);
            }
        }).cache();
    }

    private Observable<Void> writeWhenSubscribed(final Object obj, final PendingAck pendingAck) {
        return this.connection.writeAndFlush(obj).doOnCompleted(new Action0() { // from class: com.netflix.eureka2.transport.base.BaseMessageConnection.8
            public void call() {
                BaseMessageConnection.this.pendingAckQueue.add(pendingAck);
                BaseMessageConnection.this.metrics.incrementPendingAckCounter();
                BaseMessageConnection.this.metrics.incrementOutgoingMessageCounter(obj.getClass(), 1);
                if (BaseMessageConnection.this.closed.get()) {
                    BaseMessageConnection.this.drainPendingAckQueue();
                }
            }
        }).concatWith(pendingAck).cache();
    }
}
