/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.channel;

import com.netflix.eureka2.channel.AbstractServiceChannel;
import com.netflix.eureka2.metric.StateMachineMetrics;
import com.netflix.eureka2.transport.MessageConnection;
import com.netflix.eureka2.transport.TransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;

public abstract class AbstractClientChannel<STATE extends Enum<STATE>>
extends AbstractServiceChannel<STATE> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractClientChannel.class);
    protected final TransportClient client;
    private volatile MessageConnection connectionIfConnected;
    private final Observable<MessageConnection> singleConnection;

    protected AbstractClientChannel(STATE initState, TransportClient client, StateMachineMetrics<STATE> metrics) {
        super(initState, metrics);
        this.client = client;
        this.singleConnection = client.connect().take(1).map((Func1)new Func1<MessageConnection, MessageConnection>(){

            public MessageConnection call(MessageConnection serverConnection) {
                if (AbstractClientChannel.this.connectionIfConnected == null) {
                    AbstractClientChannel.this.connectionIfConnected = serverConnection;
                }
                AbstractClientChannel.this.subscribeToConnectionLifecycle(AbstractClientChannel.this.connectionIfConnected);
                return AbstractClientChannel.this.connectionIfConnected;
            }
        }).cache();
    }

    @Override
    protected void _close() {
        if (logger.isDebugEnabled()) {
            logger.debug("Closing client interest channel with state: " + this.state.get());
        }
        if (null != this.connectionIfConnected) {
            this.connectionIfConnected.shutdown();
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("{channel=").append(this.name).append(", connection=");
        if (this.connectionIfConnected == null) {
            sb.append("<not connected>");
        } else {
            sb.append(this.connectionIfConnected);
        }
        sb.append('}');
        return sb.toString();
    }

    private void subscribeToConnectionLifecycle(MessageConnection connection) {
        connection.lifecycleObservable().subscribe((Subscriber)new Subscriber<Void>(){

            public void onCompleted() {
                AbstractClientChannel.this.close();
            }

            public void onError(Throwable e) {
                AbstractClientChannel.this.close(e);
            }

            public void onNext(Void aVoid) {
            }
        });
    }

    protected Observable<MessageConnection> connect() {
        return this.singleConnection;
    }

    protected <T> Observable<Void> sendExpectAckOnConnection(MessageConnection connection, T message) {
        if (logger.isDebugEnabled()) {
            logger.debug("Sending message to the server: {}", message);
        }
        return this.subscribeToTransportSend(connection.submitWithAck(message), message.getClass().getSimpleName());
    }

    protected <T> Observable<Void> sendOnConnection(MessageConnection connection, T message) {
        if (logger.isDebugEnabled()) {
            logger.debug("Sending message to the server: {}", message);
        }
        return this.subscribeToTransportSend(connection.submit(message), message.getClass().getSimpleName());
    }

    protected Observable<Void> sendErrorOnConnection(MessageConnection connection, Throwable throwable) {
        if (logger.isDebugEnabled()) {
            logger.debug("Sending error to the server: {}", throwable);
        }
        return this.subscribeToTransportSend(connection.onError(throwable), "error");
    }

    protected Observable<Void> sendAckOnConnection(MessageConnection connection) {
        if (logger.isDebugEnabled()) {
            logger.debug("Sending acknowledgment to the server");
        }
        return this.subscribeToTransportSend(connection.acknowledge(), "acknowledgment");
    }

    protected Observable<Void> subscribeToTransportSend(Observable<Void> sendResult, final String sendType) {
        sendResult.subscribe((Action1)new Action1<Void>(){

            public void call(Void aVoid) {
            }
        }, (Action1)new Action1<Throwable>(){

            public void call(Throwable throwable) {
                logger.warn("Failed to send " + sendType + " to the server. Closing the channel.", throwable);
                AbstractClientChannel.this.close(throwable);
            }
        });
        return sendResult;
    }
}

