package org.apache.dubbo.registry.xds.util.protocol;

import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.registry.xds.util.XdsChannel;
import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;

/* loaded from: input_file:org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.class */
public abstract class AbstractProtocol<T, S extends DeltaResource<T>> implements XdsProtocol<T> {
    protected final XdsChannel xdsChannel;
    protected final Node node;
    protected final Map<Long, Set<String>> requestParam = new ConcurrentHashMap();
    private final Map<Long, StreamObserver<DiscoveryRequest>> requestObserverMap = new ConcurrentHashMap();
    private final Map<Long, ScheduledFuture<?>> observeScheduledMap = new ConcurrentHashMap();
    private final Map<Long, CompletableFuture<T>> streamResult = new ConcurrentHashMap();
    private final ScheduledExecutorService pollingExecutor;
    private final int pollingTimeout;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AbstractProtocol.class);
    protected static final AtomicLong requestId = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol$ResponseObserver.class */
    public class ResponseObserver implements StreamObserver<DiscoveryResponse> {
        private final long requestId;

        public ResponseObserver(long j) {
            this.requestId = j;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void onNext(DiscoveryResponse discoveryResponse) {
            AbstractProtocol.logger.info("receive notification from xds server, type: " + AbstractProtocol.this.getTypeUrl() + " requestId: " + this.requestId);
            Object decodeDiscoveryResponse = AbstractProtocol.this.decodeDiscoveryResponse(discoveryResponse);
            StreamObserver streamObserver = (StreamObserver) AbstractProtocol.this.requestObserverMap.get(Long.valueOf(this.requestId));
            if (streamObserver == null) {
                return;
            }
            streamObserver.onNext(AbstractProtocol.this.buildDiscoveryRequest(Collections.emptySet(), discoveryResponse));
            CompletableFuture completableFuture = (CompletableFuture) AbstractProtocol.this.streamResult.get(Long.valueOf(this.requestId));
            if (completableFuture == 0) {
                return;
            }
            completableFuture.complete(decodeDiscoveryResponse);
        }

        public void onError(Throwable th) {
            AbstractProtocol.logger.error("xDS Client received error message! detail:", th);
        }

        public void onCompleted() {
        }
    }

    public AbstractProtocol(XdsChannel xdsChannel, Node node, int i, int i2) {
        this.xdsChannel = xdsChannel;
        this.node = node;
        this.pollingExecutor = new ScheduledThreadPoolExecutor(i, new NamedThreadFactory("Dubbo-registry-xds"));
        this.pollingTimeout = i2;
    }

    public abstract String getTypeUrl();

    @Override // org.apache.dubbo.registry.xds.util.protocol.XdsProtocol
    public T getResource(Set<String> set) {
        long andIncrement = requestId.getAndIncrement();
        Set<String> emptySet = set == null ? Collections.emptySet() : set;
        this.requestParam.put(Long.valueOf(andIncrement), emptySet);
        StreamObserver<DiscoveryRequest> createDeltaDiscoveryRequest = this.xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(andIncrement));
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        this.requestObserverMap.put(Long.valueOf(andIncrement), createDeltaDiscoveryRequest);
        this.streamResult.put(Long.valueOf(andIncrement), completableFuture);
        createDeltaDiscoveryRequest.onNext(buildDiscoveryRequest(emptySet));
        try {
            try {
                T t = completableFuture.get();
                this.streamResult.remove(Long.valueOf(andIncrement));
                this.requestObserverMap.remove(Long.valueOf(andIncrement));
                this.requestParam.remove(Long.valueOf(andIncrement));
                return t;
            } catch (InterruptedException | ExecutionException e) {
                logger.error("Error occur when request control panel.");
                this.streamResult.remove(Long.valueOf(andIncrement));
                this.requestObserverMap.remove(Long.valueOf(andIncrement));
                this.requestParam.remove(Long.valueOf(andIncrement));
                return null;
            }
        } catch (Throwable th) {
            this.streamResult.remove(Long.valueOf(andIncrement));
            this.requestObserverMap.remove(Long.valueOf(andIncrement));
            this.requestParam.remove(Long.valueOf(andIncrement));
            throw th;
        }
    }

    @Override // org.apache.dubbo.registry.xds.util.protocol.XdsProtocol
    public long observeResource(Set<String> set, Consumer<T> consumer) {
        long andIncrement = requestId.getAndIncrement();
        Set<String> emptySet = set == null ? Collections.emptySet() : set;
        this.requestParam.put(Long.valueOf(andIncrement), emptySet);
        consumer.accept(getResource(emptySet));
        this.requestObserverMap.put(Long.valueOf(andIncrement), this.xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(andIncrement)));
        this.observeScheduledMap.put(Long.valueOf(andIncrement), this.pollingExecutor.scheduleAtFixedRate(() -> {
            try {
                Set<String> set2 = this.requestParam.get(Long.valueOf(andIncrement));
                CompletableFuture<T> completableFuture = new CompletableFuture<>();
                this.streamResult.put(Long.valueOf(andIncrement), completableFuture);
                this.requestObserverMap.get(Long.valueOf(andIncrement)).onNext(buildDiscoveryRequest(set2));
                try {
                    try {
                        consumer.accept(completableFuture.get());
                        this.streamResult.remove(Long.valueOf(andIncrement));
                    } catch (Throwable th) {
                        this.streamResult.remove(Long.valueOf(andIncrement));
                        throw th;
                    }
                } catch (InterruptedException | ExecutionException e) {
                    logger.error("Error occur when request control panel.");
                    this.streamResult.remove(Long.valueOf(andIncrement));
                }
            } catch (Throwable th2) {
                logger.error("Error when requesting observe data. Type: " + getTypeUrl(), th2);
            }
        }, this.pollingTimeout, this.pollingTimeout, TimeUnit.SECONDS));
        return andIncrement;
    }

    @Override // org.apache.dubbo.registry.xds.util.protocol.XdsProtocol
    public void updateObserve(long j, Set<String> set) {
        this.requestParam.put(Long.valueOf(j), set);
    }

    protected DiscoveryRequest buildDiscoveryRequest(Set<String> set) {
        return DiscoveryRequest.newBuilder().setNode(this.node).setTypeUrl(getTypeUrl()).addAllResourceNames(set).build();
    }

    protected DiscoveryRequest buildDiscoveryRequest(Set<String> set, DiscoveryResponse discoveryResponse) {
        return DiscoveryRequest.newBuilder().setNode(this.node).setTypeUrl(discoveryResponse.getTypeUrl()).setVersionInfo(discoveryResponse.getVersionInfo()).setResponseNonce(discoveryResponse.getNonce()).build();
    }

    protected abstract T decodeDiscoveryResponse(DiscoveryResponse discoveryResponse);
}
