/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.client.channel;

import com.netflix.eureka2.channel.AbstractClientChannel;
import com.netflix.eureka2.channel.InterestChannel;
import com.netflix.eureka2.client.interest.BatchingRegistry;
import com.netflix.eureka2.interests.ChangeNotification;
import com.netflix.eureka2.interests.Interest;
import com.netflix.eureka2.interests.ModifyNotification;
import com.netflix.eureka2.interests.StreamStateNotification;
import com.netflix.eureka2.metric.InterestChannelMetrics;
import com.netflix.eureka2.protocol.discovery.AddInstance;
import com.netflix.eureka2.protocol.discovery.DeleteInstance;
import com.netflix.eureka2.protocol.discovery.InterestRegistration;
import com.netflix.eureka2.protocol.discovery.InterestSetNotification;
import com.netflix.eureka2.protocol.discovery.StreamStateUpdate;
import com.netflix.eureka2.protocol.discovery.UpdateInstanceInfo;
import com.netflix.eureka2.registry.Source;
import com.netflix.eureka2.registry.Sourced;
import com.netflix.eureka2.registry.SourcedEurekaRegistry;
import com.netflix.eureka2.registry.instance.Delta;
import com.netflix.eureka2.registry.instance.InstanceInfo;
import com.netflix.eureka2.transport.MessageConnection;
import com.netflix.eureka2.transport.TransportClient;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.observers.SafeSubscriber;

public class InterestChannelImpl
extends AbstractClientChannel<InterestChannel.STATE>
implements InterestChannel,
Sourced {
    private static final Logger logger = LoggerFactory.getLogger(InterestChannelImpl.class);
    private final BatchingRegistry<InstanceInfo> remoteBatchingRegistry;
    protected Observable<ChangeNotification<InstanceInfo>> channelInterestStream;
    protected Subscriber<ChangeNotification<InstanceInfo>> channelInterestSubscriber;
    private final Source selfSource;
    protected final SourcedEurekaRegistry<InstanceInfo> registry;
    private final Map<String, InstanceInfo> idVsInstance = new HashMap<String, InstanceInfo>();

    public InterestChannelImpl(SourcedEurekaRegistry<InstanceInfo> registry, BatchingRegistry<InstanceInfo> remoteBatchingRegistry, TransportClient client, InterestChannelMetrics metrics) {
        super(InterestChannel.STATE.Idle, client, metrics);
        this.remoteBatchingRegistry = remoteBatchingRegistry;
        this.selfSource = new Source(Source.Origin.INTERESTED);
        this.registry = registry;
        this.channelInterestSubscriber = new ChannelInterestSubscriber(registry);
        this.channelInterestStream = this.createInterestStream();
    }

    @Override
    public Source getSource() {
        return this.selfSource;
    }

    @Override
    public Observable<Void> change(final Interest<InstanceInfo> newInterest) {
        if (this.state.get() == InterestChannel.STATE.Closed) {
            return Observable.error((Throwable)CHANNEL_CLOSED_EXCEPTION);
        }
        Observable serverRequest = this.connect().switchMap((Func1)new Func1<MessageConnection, Observable<Void>>(){

            public Observable<Void> call(MessageConnection serverConnection) {
                return InterestChannelImpl.this.sendExpectAckOnConnection(serverConnection, new InterestRegistration(newInterest));
            }
        });
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Void>(){

            public void call(Subscriber<? super Void> subscriber) {
                if (InterestChannel.STATE.Closed == InterestChannelImpl.this.state.get()) {
                    subscriber.onError((Throwable)CHANNEL_CLOSED_EXCEPTION);
                } else if (InterestChannelImpl.this.moveToState(InterestChannel.STATE.Idle, InterestChannel.STATE.Open)) {
                    logger.debug("First time registration: {}", (Object)newInterest);
                    InterestChannelImpl.this.channelInterestStream.subscribe(InterestChannelImpl.this.channelInterestSubscriber);
                    InterestChannelImpl.this.remoteBatchingRegistry.connectTo(InterestChannelImpl.this.channelInterestStream);
                } else {
                    logger.debug("Channel changes: {}", (Object)newInterest);
                }
                InterestChannelImpl.this.remoteBatchingRegistry.retainAll(newInterest);
                subscriber.onCompleted();
            }
        }).concatWith(serverRequest);
    }

    @Override
    protected void _close() {
        if (this.state.get() != InterestChannel.STATE.Closed) {
            this.moveToState((Enum)this.state.get(), InterestChannel.STATE.Closed);
            this.idVsInstance.clear();
            super._close();
        }
    }

    protected Observable<ChangeNotification<InstanceInfo>> createInterestStream() {
        return this.connect().switchMap((Func1)new Func1<MessageConnection, Observable<? extends ChangeNotification<InstanceInfo>>>(){

            public Observable<? extends ChangeNotification<InstanceInfo>> call(final MessageConnection connection) {
                return connection.incoming().filter((Func1)new Func1<Object, Boolean>(){

                    public Boolean call(Object message) {
                        boolean isKnown = message instanceof InterestSetNotification;
                        if (!isKnown) {
                            logger.warn("Unrecognized discovery protocol message of type " + message.getClass());
                        }
                        return isKnown;
                    }
                }).map((Func1)new Func1<Object, ChangeNotification<InstanceInfo>>(){

                    public ChangeNotification<InstanceInfo> call(Object message) {
                        ChangeNotification changeNotification;
                        InterestSetNotification notification = (InterestSetNotification)message;
                        if (notification instanceof AddInstance) {
                            changeNotification = InterestChannelImpl.this.addMessageToChangeNotification((AddInstance)notification);
                        } else if (notification instanceof UpdateInstanceInfo) {
                            changeNotification = InterestChannelImpl.this.updateMessageToChangeNotification((UpdateInstanceInfo)notification);
                        } else if (notification instanceof DeleteInstance) {
                            changeNotification = InterestChannelImpl.this.deleteMessageToChangeNotification((DeleteInstance)notification);
                        } else if (notification instanceof StreamStateUpdate) {
                            changeNotification = InterestChannelImpl.this.streamStateUpdateToStreamStateNotification((StreamStateUpdate)notification);
                        } else {
                            throw new IllegalArgumentException("Unknown message received on the interest channel. Type: " + message.getClass().getName());
                        }
                        InterestChannelImpl.this.sendAckOnConnection(connection);
                        return changeNotification;
                    }
                }).filter((Func1)new Func1<ChangeNotification<InstanceInfo>, Boolean>(){

                    public Boolean call(ChangeNotification<InstanceInfo> notification) {
                        return null != notification;
                    }
                });
            }
        }).share();
    }

    private ChangeNotification<InstanceInfo> addMessageToChangeNotification(AddInstance msg) {
        InstanceInfo incoming = msg.getInstanceInfo();
        InstanceInfo cached = this.idVsInstance.get(incoming.getId());
        if (cached != null) {
            logger.debug("Received newer version of an existing instanceInfo as Add");
        }
        this.idVsInstance.put(incoming.getId(), incoming);
        ChangeNotification<InstanceInfo> notification = new ChangeNotification<InstanceInfo>(ChangeNotification.Kind.Add, incoming);
        return notification;
    }

    private ChangeNotification<InstanceInfo> updateMessageToChangeNotification(UpdateInstanceInfo msg) {
        ModifyNotification<InstanceInfo> notification = null;
        Delta<?> delta = msg.getDelta();
        InstanceInfo cached = this.idVsInstance.get(delta.getId());
        if (cached == null) {
            if (logger.isWarnEnabled()) {
                logger.warn("Update notification received for non-existent instance id " + delta.getId());
            }
        } else {
            InstanceInfo updatedInfo = cached.applyDelta(delta);
            this.idVsInstance.put(updatedInfo.getId(), updatedInfo);
            notification = new ModifyNotification<InstanceInfo>(updatedInfo, Collections.singleton(delta));
        }
        return notification;
    }

    private ChangeNotification<InstanceInfo> deleteMessageToChangeNotification(DeleteInstance msg) {
        ChangeNotification<InstanceInfo> notification = null;
        String instanceId = msg.getInstanceId();
        InstanceInfo removedInstance = this.idVsInstance.remove(instanceId);
        if (removedInstance != null) {
            notification = new ChangeNotification<InstanceInfo>(ChangeNotification.Kind.Delete, removedInstance);
        } else if (logger.isWarnEnabled()) {
            logger.warn("Delete notification received for non-existent instance id:" + instanceId);
        }
        return notification;
    }

    private ChangeNotification<InstanceInfo> streamStateUpdateToStreamStateNotification(StreamStateUpdate notification) {
        StreamStateNotification.BufferState state = notification.getState();
        if (state == StreamStateNotification.BufferState.BufferStart || state == StreamStateNotification.BufferState.BufferEnd) {
            return new StreamStateNotification<InstanceInfo>(state, notification.getInterest());
        }
        throw new IllegalStateException("Unexpected state " + (Object)((Object)state));
    }

    protected class ChannelInterestSubscriber
    extends SafeSubscriber<ChangeNotification<InstanceInfo>> {
        public ChannelInterestSubscriber(final SourcedEurekaRegistry<InstanceInfo> registry) {
            super((Subscriber)new Subscriber<ChangeNotification<InstanceInfo>>(){

                public void onCompleted() {
                    logger.debug("Channel interest completed");
                }

                public void onError(Throwable e) {
                    logger.debug("Channel interest throw error", e);
                }

                public void onNext(ChangeNotification<InstanceInfo> notification) {
                    switch (notification.getKind()) {
                        case Add: 
                        case Modify: {
                            registry.register(notification.getData(), InterestChannelImpl.this.selfSource);
                            break;
                        }
                        case Delete: {
                            registry.unregister(notification.getData(), InterestChannelImpl.this.selfSource);
                            break;
                        }
                        case BufferSentinel: {
                            break;
                        }
                        default: {
                            logger.error("Unrecognized notification kind");
                        }
                    }
                }
            });
        }
    }
}

