package com.linecorp.centraldogma.server.internal.api;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.common.util.TimeoutMode;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.centraldogma.common.Entry;
import com.linecorp.centraldogma.common.Query;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.internal.api.v1.WatchTimeout;
import com.linecorp.centraldogma.server.internal.storage.RequestAlreadyTimedOutException;
import com.linecorp.centraldogma.server.storage.repository.Repository;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linecorp/centraldogma/server/internal/api/WatchService.class */
public final class WatchService {
    private static final Logger logger = LoggerFactory.getLogger(WatchService.class);
    private static final CancellationException CANCELLATION_EXCEPTION = (CancellationException) Exceptions.clearTrace(new CancellationException("watch timed out"));
    private static final double JITTER_RATE = 0.2d;
    private final Set<CompletableFuture<?>> pendingFutures = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Counter wakeupCounter;
    private final Counter timeoutCounter;
    private final Counter failureCounter;

    public WatchService(MeterRegistry meterRegistry) {
        Objects.requireNonNull(meterRegistry, "meterRegistry");
        Gauge.builder("watches.active", this, watchService -> {
            return watchService.pendingFutures.size();
        }).register(meterRegistry);
        this.wakeupCounter = Counter.builder("watches.processed").tag("result", "wakeup").register(meterRegistry);
        this.timeoutCounter = Counter.builder("watches.processed").tag("result", "timeout").register(meterRegistry);
        this.failureCounter = Counter.builder("watches.processed").tag("result", "failure").register(meterRegistry);
    }

    public CompletableFuture<Revision> watchRepository(Repository repository, Revision revision, String str, long j) {
        ServiceRequestContext serviceRequestContext = (ServiceRequestContext) RequestContext.current();
        updateRequestTimeout(serviceRequestContext, j);
        CompletableFuture<Revision> watch = repository.watch(revision, str);
        if (watch.isDone()) {
            return watch;
        }
        scheduleTimeout(serviceRequestContext, watch, j);
        return watch;
    }

    private static void updateRequestTimeout(ServiceRequestContext serviceRequestContext, long j) {
        serviceRequestContext.setRequestTimeoutMillis(TimeoutMode.EXTEND, WatchTimeout.availableTimeout(j, serviceRequestContext.requestTimeoutMillis()));
    }

    public <T> CompletableFuture<Entry<T>> watchFile(Repository repository, Revision revision, Query<T> query, long j) {
        ServiceRequestContext serviceRequestContext = (ServiceRequestContext) RequestContext.current();
        updateRequestTimeout(serviceRequestContext, j);
        CompletableFuture<Entry<T>> watch = repository.watch(revision, query);
        if (watch.isDone()) {
            return watch;
        }
        scheduleTimeout(serviceRequestContext, watch, j);
        return watch;
    }

    private <T> void scheduleTimeout(ServiceRequestContext serviceRequestContext, CompletableFuture<T> completableFuture, long j) {
        long j2;
        ScheduledFuture scheduledFuture;
        this.pendingFutures.add(completableFuture);
        if (j > 0) {
            j2 = applyJitter(WatchTimeout.availableTimeout(j));
            scheduledFuture = serviceRequestContext.eventLoop().schedule(() -> {
                return Boolean.valueOf(completableFuture.completeExceptionally(CANCELLATION_EXCEPTION));
            }, j2, TimeUnit.MILLISECONDS);
        } else {
            j2 = 0;
            scheduledFuture = null;
        }
        ScheduledFuture scheduledFuture2 = scheduledFuture;
        long j3 = j2;
        completableFuture.whenComplete((BiConsumer) (obj, th) -> {
            if (scheduledFuture2 != null) {
                if (scheduledFuture2.cancel(true)) {
                    this.wakeupCounter.increment();
                    if (th instanceof RequestAlreadyTimedOutException) {
                        logger.warn("Request has timed out before watch timeout: watchTimeoutMillis={}, log={}", Long.valueOf(j3), serviceRequestContext.log());
                    }
                } else {
                    this.timeoutCounter.increment();
                }
            } else if (th == null) {
                this.wakeupCounter.increment();
            } else {
                this.failureCounter.increment();
            }
            this.pendingFutures.remove(completableFuture);
        });
    }

    private static long applyJitter(long j) {
        double nextDouble = ThreadLocalRandom.current().nextDouble(0.8d, 1.001d);
        return nextDouble < 1.0d ? (long) (j * nextDouble) : j;
    }
}
