package com.netflix.eureka2.client.channel;

import com.netflix.eureka2.channel.AbstractClientChannel;
import com.netflix.eureka2.channel.InterestChannel;
import com.netflix.eureka2.client.interest.BatchingRegistry;
import com.netflix.eureka2.interests.ChangeNotification;
import com.netflix.eureka2.interests.Interest;
import com.netflix.eureka2.interests.ModifyNotification;
import com.netflix.eureka2.interests.StreamStateNotification;
import com.netflix.eureka2.metric.InterestChannelMetrics;
import com.netflix.eureka2.protocol.discovery.AddInstance;
import com.netflix.eureka2.protocol.discovery.DeleteInstance;
import com.netflix.eureka2.protocol.discovery.InterestRegistration;
import com.netflix.eureka2.protocol.discovery.InterestSetNotification;
import com.netflix.eureka2.protocol.discovery.StreamStateUpdate;
import com.netflix.eureka2.protocol.discovery.UpdateInstanceInfo;
import com.netflix.eureka2.registry.Source;
import com.netflix.eureka2.registry.Sourced;
import com.netflix.eureka2.registry.SourcedEurekaRegistry;
import com.netflix.eureka2.registry.instance.Delta;
import com.netflix.eureka2.registry.instance.InstanceInfo;
import com.netflix.eureka2.transport.MessageConnection;
import com.netflix.eureka2.transport.TransportClient;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.observers.SafeSubscriber;

/* loaded from: input_file:com/netflix/eureka2/client/channel/InterestChannelImpl.class */
public class InterestChannelImpl extends AbstractClientChannel<InterestChannel.STATE> implements InterestChannel, Sourced {
    private static final Logger logger = LoggerFactory.getLogger(InterestChannelImpl.class);
    private final BatchingRegistry<InstanceInfo> remoteBatchingRegistry;
    protected Observable<ChangeNotification<InstanceInfo>> channelInterestStream;
    protected Subscriber<ChangeNotification<InstanceInfo>> channelInterestSubscriber;
    private final Source selfSource;
    protected final SourcedEurekaRegistry<InstanceInfo> registry;
    private final Map<String, InstanceInfo> idVsInstance;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.netflix.eureka2.client.channel.InterestChannelImpl$4, reason: invalid class name */
    /* loaded from: input_file:com/netflix/eureka2/client/channel/InterestChannelImpl$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$com$netflix$eureka2$interests$ChangeNotification$Kind = new int[ChangeNotification.Kind.values().length];

        static {
            try {
                $SwitchMap$com$netflix$eureka2$interests$ChangeNotification$Kind[ChangeNotification.Kind.Add.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$netflix$eureka2$interests$ChangeNotification$Kind[ChangeNotification.Kind.Modify.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$netflix$eureka2$interests$ChangeNotification$Kind[ChangeNotification.Kind.Delete.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$netflix$eureka2$interests$ChangeNotification$Kind[ChangeNotification.Kind.BufferSentinel.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:com/netflix/eureka2/client/channel/InterestChannelImpl$ChannelInterestSubscriber.class */
    protected class ChannelInterestSubscriber extends SafeSubscriber<ChangeNotification<InstanceInfo>> {
        public ChannelInterestSubscriber(final SourcedEurekaRegistry<InstanceInfo> sourcedEurekaRegistry) {
            super(new Subscriber<ChangeNotification<InstanceInfo>>() { // from class: com.netflix.eureka2.client.channel.InterestChannelImpl.ChannelInterestSubscriber.1
                public void onCompleted() {
                    InterestChannelImpl.logger.debug("Channel interest completed");
                }

                public void onError(Throwable th) {
                    InterestChannelImpl.logger.debug("Channel interest throw error", th);
                }

                public void onNext(ChangeNotification<InstanceInfo> changeNotification) {
                    switch (AnonymousClass4.$SwitchMap$com$netflix$eureka2$interests$ChangeNotification$Kind[changeNotification.getKind().ordinal()]) {
                        case 1:
                        case 2:
                            sourcedEurekaRegistry.register(changeNotification.getData(), InterestChannelImpl.this.selfSource);
                            return;
                        case 3:
                            sourcedEurekaRegistry.unregister(changeNotification.getData(), InterestChannelImpl.this.selfSource);
                            return;
                        case 4:
                            return;
                        default:
                            InterestChannelImpl.logger.error("Unrecognized notification kind");
                            return;
                    }
                }
            });
        }
    }

    public InterestChannelImpl(SourcedEurekaRegistry<InstanceInfo> sourcedEurekaRegistry, BatchingRegistry<InstanceInfo> batchingRegistry, TransportClient transportClient, InterestChannelMetrics interestChannelMetrics) {
        super(InterestChannel.STATE.Idle, transportClient, interestChannelMetrics);
        this.idVsInstance = new HashMap();
        this.remoteBatchingRegistry = batchingRegistry;
        this.selfSource = new Source(Source.Origin.INTERESTED);
        this.registry = sourcedEurekaRegistry;
        this.channelInterestSubscriber = new ChannelInterestSubscriber(sourcedEurekaRegistry);
        this.channelInterestStream = createInterestStream();
    }

    public Source getSource() {
        return this.selfSource;
    }

    public Observable<Void> change(final Interest<InstanceInfo> interest) {
        if (this.state.get() == InterestChannel.STATE.Closed) {
            return Observable.error(CHANNEL_CLOSED_EXCEPTION);
        }
        return Observable.create(new Observable.OnSubscribe<Void>() { // from class: com.netflix.eureka2.client.channel.InterestChannelImpl.2
            public void call(Subscriber<? super Void> subscriber) {
                if (InterestChannel.STATE.Closed == InterestChannelImpl.this.state.get()) {
                    subscriber.onError(InterestChannelImpl.CHANNEL_CLOSED_EXCEPTION);
                } else if (InterestChannelImpl.this.moveToState(InterestChannel.STATE.Idle, InterestChannel.STATE.Open)) {
                    InterestChannelImpl.logger.debug("First time registration: {}", interest);
                    InterestChannelImpl.this.channelInterestStream.subscribe(InterestChannelImpl.this.channelInterestSubscriber);
                    InterestChannelImpl.this.remoteBatchingRegistry.connectTo(InterestChannelImpl.this.channelInterestStream);
                } else {
                    InterestChannelImpl.logger.debug("Channel changes: {}", interest);
                }
                InterestChannelImpl.this.remoteBatchingRegistry.retainAll(interest);
                subscriber.onCompleted();
            }
        }).concatWith(connect().switchMap(new Func1<MessageConnection, Observable<Void>>() { // from class: com.netflix.eureka2.client.channel.InterestChannelImpl.1
            public Observable<Void> call(MessageConnection messageConnection) {
                return InterestChannelImpl.this.sendExpectAckOnConnection(messageConnection, new InterestRegistration(interest));
            }
        }));
    }

    protected void _close() {
        if (this.state.get() != InterestChannel.STATE.Closed) {
            moveToState((Enum) this.state.get(), InterestChannel.STATE.Closed);
            this.idVsInstance.clear();
            super._close();
        }
    }

    protected Observable<ChangeNotification<InstanceInfo>> createInterestStream() {
        return connect().switchMap(new Func1<MessageConnection, Observable<? extends ChangeNotification<InstanceInfo>>>() { // from class: com.netflix.eureka2.client.channel.InterestChannelImpl.3
            public Observable<? extends ChangeNotification<InstanceInfo>> call(final MessageConnection messageConnection) {
                return messageConnection.incoming().filter(new Func1<Object, Boolean>() { // from class: com.netflix.eureka2.client.channel.InterestChannelImpl.3.3
                    /* renamed from: call, reason: merged with bridge method [inline-methods] */
                    public Boolean m3call(Object obj) {
                        boolean z = obj instanceof InterestSetNotification;
                        if (!z) {
                            InterestChannelImpl.logger.warn("Unrecognized discovery protocol message of type " + obj.getClass());
                        }
                        return Boolean.valueOf(z);
                    }
                }).map(new Func1<Object, ChangeNotification<InstanceInfo>>() { // from class: com.netflix.eureka2.client.channel.InterestChannelImpl.3.2
                    /* renamed from: call, reason: merged with bridge method [inline-methods] */
                    public ChangeNotification<InstanceInfo> m2call(Object obj) {
                        ChangeNotification<InstanceInfo> streamStateUpdateToStreamStateNotification;
                        AddInstance addInstance = (InterestSetNotification) obj;
                        if (addInstance instanceof AddInstance) {
                            streamStateUpdateToStreamStateNotification = InterestChannelImpl.this.addMessageToChangeNotification(addInstance);
                        } else if (addInstance instanceof UpdateInstanceInfo) {
                            streamStateUpdateToStreamStateNotification = InterestChannelImpl.this.updateMessageToChangeNotification((UpdateInstanceInfo) addInstance);
                        } else if (addInstance instanceof DeleteInstance) {
                            streamStateUpdateToStreamStateNotification = InterestChannelImpl.this.deleteMessageToChangeNotification((DeleteInstance) addInstance);
                        } else {
                            if (!(addInstance instanceof StreamStateUpdate)) {
                                throw new IllegalArgumentException("Unknown message received on the interest channel. Type: " + obj.getClass().getName());
                            }
                            streamStateUpdateToStreamStateNotification = InterestChannelImpl.this.streamStateUpdateToStreamStateNotification((StreamStateUpdate) addInstance);
                        }
                        InterestChannelImpl.this.sendAckOnConnection(messageConnection);
                        return streamStateUpdateToStreamStateNotification;
                    }
                }).filter(new Func1<ChangeNotification<InstanceInfo>, Boolean>() { // from class: com.netflix.eureka2.client.channel.InterestChannelImpl.3.1
                    public Boolean call(ChangeNotification<InstanceInfo> changeNotification) {
                        return Boolean.valueOf(null != changeNotification);
                    }
                });
            }
        }).share();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChangeNotification<InstanceInfo> addMessageToChangeNotification(AddInstance addInstance) {
        InstanceInfo instanceInfo = addInstance.getInstanceInfo();
        if (this.idVsInstance.get(instanceInfo.getId()) != null) {
            logger.debug("Received newer version of an existing instanceInfo as Add");
        }
        this.idVsInstance.put(instanceInfo.getId(), instanceInfo);
        return new ChangeNotification<>(ChangeNotification.Kind.Add, instanceInfo);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChangeNotification<InstanceInfo> updateMessageToChangeNotification(UpdateInstanceInfo updateInstanceInfo) {
        ChangeNotification<InstanceInfo> changeNotification = null;
        Delta delta = updateInstanceInfo.getDelta();
        InstanceInfo instanceInfo = this.idVsInstance.get(delta.getId());
        if (instanceInfo != null) {
            InstanceInfo applyDelta = instanceInfo.applyDelta(delta);
            this.idVsInstance.put(applyDelta.getId(), applyDelta);
            changeNotification = new ModifyNotification<>(applyDelta, Collections.singleton(delta));
        } else if (logger.isWarnEnabled()) {
            logger.warn("Update notification received for non-existent instance id " + delta.getId());
        }
        return changeNotification;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChangeNotification<InstanceInfo> deleteMessageToChangeNotification(DeleteInstance deleteInstance) {
        ChangeNotification<InstanceInfo> changeNotification = null;
        String instanceId = deleteInstance.getInstanceId();
        InstanceInfo remove = this.idVsInstance.remove(instanceId);
        if (remove != null) {
            changeNotification = new ChangeNotification<>(ChangeNotification.Kind.Delete, remove);
        } else if (logger.isWarnEnabled()) {
            logger.warn("Delete notification received for non-existent instance id:" + instanceId);
        }
        return changeNotification;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChangeNotification<InstanceInfo> streamStateUpdateToStreamStateNotification(StreamStateUpdate streamStateUpdate) {
        StreamStateNotification.BufferState state = streamStateUpdate.getState();
        if (state == StreamStateNotification.BufferState.BufferStart || state == StreamStateNotification.BufferState.BufferEnd) {
            return new StreamStateNotification(state, streamStateUpdate.getInterest());
        }
        throw new IllegalStateException("Unexpected state " + state);
    }
}
