package com.netflix.eureka2.channel;

import com.netflix.eureka2.metric.StateMachineMetrics;
import com.netflix.eureka2.transport.MessageConnection;
import com.netflix.eureka2.transport.TransportClient;
import java.lang.Enum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:com/netflix/eureka2/channel/AbstractClientChannel.class */
public abstract class AbstractClientChannel<STATE extends Enum<STATE>> extends AbstractServiceChannel<STATE> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractClientChannel.class);
    protected final TransportClient client;
    private volatile MessageConnection connectionIfConnected;
    private final Observable<MessageConnection> singleConnection;

    protected AbstractClientChannel(STATE state, TransportClient transportClient, StateMachineMetrics<STATE> stateMachineMetrics) {
        super(state, stateMachineMetrics);
        this.client = transportClient;
        this.singleConnection = transportClient.connect().take(1).map(new Func1<MessageConnection, MessageConnection>() { // from class: com.netflix.eureka2.channel.AbstractClientChannel.1
            public MessageConnection call(MessageConnection messageConnection) {
                if (AbstractClientChannel.this.connectionIfConnected == null) {
                    AbstractClientChannel.this.connectionIfConnected = messageConnection;
                }
                AbstractClientChannel.this.subscribeToConnectionLifecycle(AbstractClientChannel.this.connectionIfConnected);
                return AbstractClientChannel.this.connectionIfConnected;
            }
        }).cache();
    }

    @Override // com.netflix.eureka2.channel.AbstractServiceChannel
    protected void _close() {
        if (logger.isDebugEnabled()) {
            logger.debug("Closing client interest channel with state: " + this.state.get());
        }
        if (null != this.connectionIfConnected) {
            this.connectionIfConnected.shutdown();
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("{channel=").append(this.name).append(", connection=");
        if (this.connectionIfConnected == null) {
            sb.append("<not connected>");
        } else {
            sb.append(this.connectionIfConnected);
        }
        sb.append('}');
        return sb.toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void subscribeToConnectionLifecycle(MessageConnection messageConnection) {
        messageConnection.lifecycleObservable().subscribe(new Subscriber<Void>() { // from class: com.netflix.eureka2.channel.AbstractClientChannel.2
            public void onCompleted() {
                AbstractClientChannel.this.close();
            }

            public void onError(Throwable th) {
                AbstractClientChannel.this.close(th);
            }

            public void onNext(Void r2) {
            }
        });
    }

    protected Observable<MessageConnection> connect() {
        return this.singleConnection;
    }

    protected <T> Observable<Void> sendExpectAckOnConnection(MessageConnection messageConnection, T t) {
        if (logger.isDebugEnabled()) {
            logger.debug("Sending message to the server: {}", t);
        }
        return subscribeToTransportSend(messageConnection.submitWithAck(t), t.getClass().getSimpleName());
    }

    protected <T> Observable<Void> sendOnConnection(MessageConnection messageConnection, T t) {
        if (logger.isDebugEnabled()) {
            logger.debug("Sending message to the server: {}", t);
        }
        return subscribeToTransportSend(messageConnection.submit(t), t.getClass().getSimpleName());
    }

    protected Observable<Void> sendErrorOnConnection(MessageConnection messageConnection, Throwable th) {
        if (logger.isDebugEnabled()) {
            logger.debug("Sending error to the server: {}", th);
        }
        return subscribeToTransportSend(messageConnection.onError(th), "error");
    }

    protected Observable<Void> sendAckOnConnection(MessageConnection messageConnection) {
        if (logger.isDebugEnabled()) {
            logger.debug("Sending acknowledgment to the server");
        }
        return subscribeToTransportSend(messageConnection.acknowledge(), "acknowledgment");
    }

    protected Observable<Void> subscribeToTransportSend(Observable<Void> observable, final String str) {
        observable.subscribe(new Action1<Void>() { // from class: com.netflix.eureka2.channel.AbstractClientChannel.3
            public void call(Void r2) {
            }
        }, new Action1<Throwable>() { // from class: com.netflix.eureka2.channel.AbstractClientChannel.4
            public void call(Throwable th) {
                AbstractClientChannel.logger.warn("Failed to send " + str + " to the server. Closing the channel.", th);
                AbstractClientChannel.this.close(th);
            }
        });
        return observable;
    }
}
