/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.extensions.manager;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnloadManager
implements StateChangeListener {
    private static final Logger log = LoggerFactory.getLogger(UnloadManager.class);
    private final UnloadCounter counter;
    private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;

    public UnloadManager(UnloadCounter counter) {
        this.counter = counter;
        this.inFlightUnloadRequest = new ConcurrentHashMap<String, CompletableFuture<Void>>();
    }

    private void complete(String serviceUnit, Throwable ex) {
        this.inFlightUnloadRequest.computeIfPresent(serviceUnit, (__, future) -> {
            if (!future.isDone()) {
                if (ex != null) {
                    future.completeExceptionally(ex);
                } else {
                    future.complete(null);
                }
            }
            return null;
        });
    }

    public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture, String bundle, UnloadDecision decision, long timeout, TimeUnit timeoutUnit) {
        return ((CompletableFuture)eventPubFuture.thenCompose(__ -> this.inFlightUnloadRequest.computeIfAbsent(bundle, ignore -> {
            if (log.isDebugEnabled()) {
                log.debug("Handle unload bundle: {}, timeout: {} {}", new Object[]{bundle, timeout, timeoutUnit});
            }
            CompletableFuture future = new CompletableFuture();
            future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> {
                if (ex != null) {
                    this.inFlightUnloadRequest.remove(bundle);
                    log.warn("Failed to wait unload for serviceUnit: {}", (Object)bundle, ex);
                }
            });
            return future;
        }))).whenComplete((__, ex) -> {
            if (ex != null) {
                this.counter.update(UnloadDecision.Label.Failure, UnloadDecision.Reason.Unknown);
                log.warn("Failed to unload bundle: {}", (Object)bundle, ex);
                return;
            }
            log.info("Complete unload bundle: {}", (Object)bundle);
            this.counter.update(decision);
        });
    }

    @Override
    public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
        ServiceUnitState state = ServiceUnitStateData.state(data);
        switch (state) {
            case Free: 
            case Owned: {
                this.complete(serviceUnit, t);
                break;
            }
            default: {
                if (!log.isDebugEnabled()) break;
                log.debug("Handling {} for service unit {}", (Object)data, (Object)serviceUnit);
            }
        }
    }

    public void close() {
        this.inFlightUnloadRequest.forEach((bundle, future) -> {
            if (!future.isDone()) {
                String msg = String.format("Unloading bundle: %s, but the unload manager already closed.", bundle);
                log.warn(msg);
                future.completeExceptionally(new IllegalStateException(msg));
            }
        });
        this.inFlightUnloadRequest.clear();
    }
}

