package com.coreos.jetcd.internal.impl;

import com.coreos.jetcd.Watch;
import com.coreos.jetcd.api.WatchCancelRequest;
import com.coreos.jetcd.api.WatchCreateRequest;
import com.coreos.jetcd.api.WatchGrpc;
import com.coreos.jetcd.api.WatchRequest;
import com.coreos.jetcd.common.exception.ErrorCode;
import com.coreos.jetcd.common.exception.EtcdException;
import com.coreos.jetcd.common.exception.EtcdExceptionFactory;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.options.WatchOption;
import com.coreos.jetcd.watch.WatchResponse;
import com.coreos.jetcd.watch.WatchResponseWithError;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/coreos/jetcd/internal/impl/WatchImpl.class */
public class WatchImpl implements Watch {
    private static final Logger logger = Logger.getLogger(WatchImpl.class.getName());
    private final ClientConnectionManager connectionManager;
    private final WatchGrpc.WatchStub stub;
    private volatile StreamObserver<WatchRequest> grpcWatchStreamObserver;
    private final ConcurrentHashMap<Long, WatcherImpl> watchers = new ConcurrentHashMap<>();
    private final ConcurrentLinkedQueue<WatcherImpl> pendingWatchers = new ConcurrentLinkedQueue<>();
    private final Set<Long> cancelSet = ConcurrentHashMap.newKeySet();
    private final ExecutorService executor = Executors.newCachedThreadPool();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private boolean closed = false;

    /* loaded from: input_file:com/coreos/jetcd/internal/impl/WatchImpl$WatcherImpl.class */
    public static class WatcherImpl implements Watch.Watcher {
        final ExecutorService executor;
        private final WatchOption watchOption;
        private final ByteSequence key;
        private final Object closedLock;
        private final BlockingQueue<WatchResponseWithError> eventsQueue;
        private long watchID;
        private long revision;
        private boolean closed;
        private final WatchImpl owner;

        private WatcherImpl(ByteSequence byteSequence, WatchOption watchOption, WatchImpl watchImpl) {
            this.executor = Executors.newSingleThreadExecutor();
            this.closedLock = new Object();
            this.eventsQueue = new LinkedBlockingQueue();
            this.closed = false;
            this.key = byteSequence;
            this.watchOption = watchOption;
            this.revision = watchOption.getRevision();
            this.owner = watchImpl;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getRevision() {
            return this.revision;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setRevision(long j) {
            this.revision = j;
        }

        public boolean isClosed() {
            boolean z;
            synchronized (this.closedLock) {
                z = this.closed;
            }
            return z;
        }

        private void setClosed() {
            synchronized (this.closedLock) {
                this.closed = true;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getWatchID() {
            return this.watchID;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setWatchID(long j) {
            this.watchID = j;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public WatchOption getWatchOption() {
            return this.watchOption;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ByteSequence getKey() {
            return this.key;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void enqueue(WatchResponseWithError watchResponseWithError) {
            try {
                this.eventsQueue.put(watchResponseWithError);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                WatchImpl.logger.log(Level.WARNING, "Interrupted", (Throwable) e);
            }
        }

        @Override // com.coreos.jetcd.Watch.Watcher, java.lang.AutoCloseable
        public void close() {
            synchronized (this.closedLock) {
                if (isClosed()) {
                    return;
                }
                setClosed();
                this.owner.cancelWatcher(this.watchID);
                this.executor.shutdownNow();
            }
        }

        @Override // com.coreos.jetcd.Watch.Watcher
        public synchronized WatchResponse listen() throws InterruptedException {
            if (isClosed()) {
                throw EtcdExceptionFactory.newClosedWatcherException();
            }
            try {
                return createWatchResponseFuture().get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw e;
            } catch (ExecutionException e2) {
                synchronized (this.closedLock) {
                    if (isClosed()) {
                        throw EtcdExceptionFactory.newClosedWatcherException();
                    }
                    Throwable cause = e2.getCause();
                    if (cause instanceof EtcdException) {
                        throw ((EtcdException) cause);
                    }
                    throw EtcdExceptionFactory.toEtcdException(e2);
                }
            } catch (RejectedExecutionException e3) {
                throw EtcdExceptionFactory.newClosedWatcherException();
            }
        }

        private Future<WatchResponse> createWatchResponseFuture() {
            return this.executor.submit(() -> {
                WatchResponseWithError take = this.eventsQueue.take();
                if (take.getException() != null) {
                    throw take.getException();
                }
                return new WatchResponse(take.getWatchResponse());
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WatchImpl(ClientConnectionManager clientConnectionManager) {
        this.connectionManager = clientConnectionManager;
        this.stub = (WatchGrpc.WatchStub) clientConnectionManager.newStub((v0) -> {
            return WatchGrpc.newStub(v0);
        });
    }

    private boolean isClosed() {
        return this.closed;
    }

    private void setClosed() {
        this.closed = true;
    }

    @Override // com.coreos.jetcd.Watch
    public Watch.Watcher watch(ByteSequence byteSequence) {
        return watch(byteSequence, WatchOption.DEFAULT);
    }

    @Override // com.coreos.jetcd.Watch
    public synchronized Watch.Watcher watch(ByteSequence byteSequence, WatchOption watchOption) {
        if (isClosed()) {
            throw EtcdExceptionFactory.newClosedWatchClientException();
        }
        WatcherImpl watcherImpl = new WatcherImpl(byteSequence, watchOption, this);
        this.pendingWatchers.add(watcherImpl);
        if (this.pendingWatchers.size() == 1) {
            getGrpcWatchStreamObserver().onNext(toWatchCreateRequest(watcherImpl));
        }
        return watcherImpl;
    }

    @Override // com.coreos.jetcd.internal.impl.CloseableClient, java.lang.AutoCloseable
    public synchronized void close() {
        if (isClosed()) {
            return;
        }
        setClosed();
        notifyWatchers(EtcdExceptionFactory.newClosedWatchClientException());
        closeGrpcWatchStreamObserver();
        this.executor.shutdownNow();
        this.scheduledExecutorService.shutdownNow();
    }

    private void notifyWatchers(EtcdException etcdException) {
        WatchResponseWithError watchResponseWithError = new WatchResponseWithError(etcdException);
        this.pendingWatchers.forEach(watcherImpl -> {
            try {
                watcherImpl.enqueue(watchResponseWithError);
            } catch (Exception e) {
                logger.log(Level.WARNING, "failed to notify watcher", (Throwable) e);
            }
        });
        this.pendingWatchers.clear();
        this.watchers.values().forEach(watcherImpl2 -> {
            try {
                watcherImpl2.enqueue(watchResponseWithError);
            } catch (Exception e) {
                logger.log(Level.WARNING, "failed to notify watcher", (Throwable) e);
            }
        });
        this.watchers.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void cancelWatcher(long j) {
        if (isClosed() || this.cancelSet.contains(Long.valueOf(j))) {
            return;
        }
        this.watchers.remove(Long.valueOf(j));
        this.cancelSet.add(Long.valueOf(j));
        getGrpcWatchStreamObserver().onNext(WatchRequest.newBuilder().setCancelRequest(WatchCancelRequest.newBuilder().setWatchId(j).m3995build()).m4096build());
    }

    private synchronized StreamObserver<WatchRequest> getGrpcWatchStreamObserver() {
        if (this.grpcWatchStreamObserver == null) {
            this.grpcWatchStreamObserver = this.stub.watch(createWatchStreamObserver());
        }
        return this.grpcWatchStreamObserver;
    }

    private StreamObserver<com.coreos.jetcd.api.WatchResponse> createWatchStreamObserver() {
        return new StreamObserver<com.coreos.jetcd.api.WatchResponse>() { // from class: com.coreos.jetcd.internal.impl.WatchImpl.1
            public void onNext(com.coreos.jetcd.api.WatchResponse watchResponse) {
                WatchImpl.this.processWatchResponse(watchResponse);
            }

            public void onError(Throwable th) {
                WatchImpl.this.processError(th);
            }

            public void onCompleted() {
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void processWatchResponse(com.coreos.jetcd.api.WatchResponse watchResponse) {
        if (isClosed()) {
            return;
        }
        if (watchResponse.getCreated()) {
            processCreate(watchResponse);
        } else if (watchResponse.getCanceled()) {
            processCanceled(watchResponse);
        } else {
            processEvents(watchResponse);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void processError(Throwable th) {
        if (isClosed()) {
            return;
        }
        Status fromThrowable = Status.fromThrowable(th);
        if (!isHaltError(fromThrowable) && !isNoLeaderError(fromThrowable)) {
            this.scheduledExecutorService.schedule(this::resume, 500L, TimeUnit.MILLISECONDS);
            return;
        }
        notifyWatchers(EtcdExceptionFactory.toEtcdException(fromThrowable));
        closeGrpcWatchStreamObserver();
        this.cancelSet.clear();
    }

    private synchronized void resume() {
        closeGrpcWatchStreamObserver();
        this.cancelSet.clear();
        resumeWatchers();
    }

    private void closeGrpcWatchStreamObserver() {
        if (this.grpcWatchStreamObserver == null) {
            return;
        }
        this.grpcWatchStreamObserver.onCompleted();
        this.grpcWatchStreamObserver = null;
    }

    private boolean isNoLeaderError(Status status) {
        return status.getCode() == Status.Code.UNAVAILABLE && "etcdserver: no leader".equals(status.getDescription());
    }

    private boolean isHaltError(Status status) {
        return (status.getCode() == Status.Code.UNAVAILABLE || status.getCode() == Status.Code.INTERNAL) ? false : true;
    }

    private void processCreate(com.coreos.jetcd.api.WatchResponse watchResponse) {
        WatcherImpl poll = this.pendingWatchers.poll();
        sendNextWatchCreateRequest();
        if (poll == null) {
            logger.log(Level.WARNING, "Watch client receives watch create response but find no corresponding watcher");
            return;
        }
        if (poll.isClosed()) {
            return;
        }
        if (watchResponse.getWatchId() == -1) {
            poll.enqueue(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException(ErrorCode.INTERNAL, "etcd server failed to create watch id")));
            return;
        }
        if (poll.getRevision() == 0) {
            poll.setRevision(watchResponse.getHeader().getRevision());
        }
        poll.setWatchID(watchResponse.getWatchId());
        this.watchers.put(Long.valueOf(poll.getWatchID()), poll);
    }

    private Optional<WatchRequest> nextResume() {
        WatcherImpl peek = this.pendingWatchers.peek();
        return peek != null ? Optional.of(toWatchCreateRequest(peek)) : Optional.empty();
    }

    private void sendNextWatchCreateRequest() {
        nextResume().ifPresent(watchRequest -> {
            getGrpcWatchStreamObserver().onNext(watchRequest);
        });
    }

    private void processEvents(com.coreos.jetcd.api.WatchResponse watchResponse) {
        WatcherImpl watcherImpl = this.watchers.get(Long.valueOf(watchResponse.getWatchId()));
        if (watcherImpl == null) {
            cancelWatcher(watchResponse.getWatchId());
            return;
        }
        if (watchResponse.getCompactRevision() != 0) {
            watcherImpl.enqueue(new WatchResponseWithError(EtcdExceptionFactory.newCompactedException(watchResponse.getCompactRevision())));
        } else if (watchResponse.getEventsCount() == 0) {
            watcherImpl.setRevision(watchResponse.getHeader().getRevision());
        } else {
            watcherImpl.enqueue(new WatchResponseWithError(watchResponse));
            watcherImpl.setRevision(watchResponse.getEvents(watchResponse.getEventsCount() - 1).getKv().getModRevision() + 1);
        }
    }

    private void resumeWatchers() {
        this.watchers.values().forEach(watcherImpl -> {
            if (watcherImpl.isClosed()) {
                return;
            }
            watcherImpl.setWatchID(-1L);
            this.pendingWatchers.add(watcherImpl);
        });
        this.watchers.clear();
        sendNextWatchCreateRequest();
    }

    private void processCanceled(com.coreos.jetcd.api.WatchResponse watchResponse) {
        WatcherImpl watcherImpl = this.watchers.get(Long.valueOf(watchResponse.getWatchId()));
        this.cancelSet.remove(Long.valueOf(watchResponse.getWatchId()));
        if (watcherImpl == null) {
            return;
        }
        String cancelReason = watchResponse.getCancelReason();
        if (Strings.isNullOrEmpty(cancelReason)) {
            watcherImpl.enqueue(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException(ErrorCode.OUT_OF_RANGE, "etcdserver: mvcc: required revision is a future revision")));
        } else {
            watcherImpl.enqueue(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException(ErrorCode.FAILED_PRECONDITION, cancelReason)));
        }
    }

    private WatchRequest toWatchCreateRequest(WatcherImpl watcherImpl) {
        ByteString byteStringFromByteSequence = Util.byteStringFromByteSequence(watcherImpl.getKey());
        WatchOption watchOption = watcherImpl.getWatchOption();
        WatchCreateRequest.Builder startRevision = WatchCreateRequest.newBuilder().setKey(byteStringFromByteSequence).setPrevKv(watchOption.isPrevKV()).setProgressNotify(watchOption.isProgressNotify()).setStartRevision(watcherImpl.getRevision());
        watchOption.getEndKey().ifPresent(byteSequence -> {
            startRevision.setRangeEnd(Util.byteStringFromByteSequence(byteSequence));
        });
        if (watchOption.isNoDelete()) {
            startRevision.addFilters(WatchCreateRequest.FilterType.NODELETE);
        }
        if (watchOption.isNoPut()) {
            startRevision.addFilters(WatchCreateRequest.FilterType.NOPUT);
        }
        return WatchRequest.newBuilder().setCreateRequest(startRevision).m4096build();
    }
}
