package com.microsoft.azure.eventprocessorhost;

import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.IllegalEntityException;
import com.microsoft.azure.eventhubs.TimeoutException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/microsoft/azure/eventprocessorhost/PartitionManager.class */
public class PartitionManager {
    protected final HostContext hostContext;
    protected Pump pump = null;
    protected volatile String[] partitionIds = null;
    private final Object scanFutureSynchronizer = new Object();
    private ScheduledFuture<?> scanFuture = null;
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(PartitionManager.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/microsoft/azure/eventprocessorhost/PartitionManager$BoolWrapper.class */
    public class BoolWrapper {
        public boolean value;

        public BoolWrapper(boolean z) {
            this.value = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/microsoft/azure/eventprocessorhost/PartitionManager$FinalException.class */
    public class FinalException extends CompletionException {
        private static final long serialVersionUID = -4600271981700687166L;

        FinalException(CompletionException completionException) {
            super(completionException);
        }

        CompletionException getInner() {
            return (CompletionException) getCause();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionManager(HostContext hostContext) {
        this.hostContext = hostContext;
    }

    CompletableFuture<Void> cachePartitionIds() {
        CompletableFuture<Void> completableFuture;
        if (this.partitionIds != null) {
            completableFuture = CompletableFuture.completedFuture(null);
        } else {
            try {
                completableFuture = EventHubClient.create(this.hostContext.getEventHubConnectionString(), this.hostContext.getRetryPolicy(), this.hostContext.getExecutor()).thenComposeAsync(eventHubClient -> {
                    return eventHubClient.getRuntimeInformation();
                }, (Executor) this.hostContext.getExecutor()).thenAcceptAsync(eventHubRuntimeInformation -> {
                    if (eventHubRuntimeInformation == null) {
                        throw new CompletionException((Throwable) new TimeoutException("getRuntimeInformation returned null"));
                    }
                    this.partitionIds = eventHubRuntimeInformation.getPartitionIds();
                    TRACE_LOGGER.info(this.hostContext.withHost("Eventhub " + this.hostContext.getEventHubPath() + " count of partitions: " + eventHubRuntimeInformation.getPartitionCount()));
                    for (String str : this.partitionIds) {
                        TRACE_LOGGER.info(this.hostContext.withHost("Found partition with id: " + str));
                    }
                }, (Executor) this.hostContext.getExecutor()).whenCompleteAsync((r7, th) -> {
                    if (th != null) {
                        Throwable th = th;
                        if (th instanceof CompletionException) {
                            th = th.getCause();
                        }
                        throw new CompletionException((Throwable) new IllegalEntityException("Failure getting partition ids for event hub", th));
                    }
                }, (Executor) this.hostContext.getExecutor());
            } catch (EventHubException | IOException e) {
                completableFuture = new CompletableFuture<>();
                completableFuture.completeExceptionally(new IllegalEntityException("Failure getting partition ids for event hub", e));
            }
        }
        return completableFuture;
    }

    Pump createPumpTestHook() {
        return new Pump(this.hostContext);
    }

    void onInitializeCompleteTestHook() {
    }

    void onPartitionCheckCompleteTestHook() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> stopPartitions() {
        synchronized (this.scanFutureSynchronizer) {
            if (this.scanFuture != null) {
                this.scanFuture.cancel(true);
            }
        }
        CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
        if (this.pump != null) {
            TRACE_LOGGER.info(this.hostContext.withHost("Shutting down all pumps"));
            completedFuture = CompletableFuture.allOf(this.pump.removeAllPumps(CloseReason.Shutdown)).whenCompleteAsync((r6, th) -> {
                if (th != null) {
                    Throwable unwrapException = LoggingUtils.unwrapException(th, null);
                    TRACE_LOGGER.warn(this.hostContext.withHost("Failure during shutdown"), unwrapException);
                    if (unwrapException instanceof Exception) {
                        this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), (Exception) unwrapException, EventProcessorHostActionStrings.PARTITION_MANAGER_CLEANUP);
                    }
                }
                TRACE_LOGGER.info(this.hostContext.withHost("Partition manager exiting"));
            }, (Executor) this.hostContext.getExecutor());
        }
        return completedFuture;
    }

    public CompletableFuture<Void> initialize() {
        this.pump = createPumpTestHook();
        return cachePartitionIds().thenComposeAsync(r3 -> {
            return initializeStores();
        }, (Executor) this.hostContext.getExecutor()).whenCompleteAsync((BiConsumer<? super U, ? super Throwable>) (obj, th) -> {
            if (th != null) {
                StringBuilder sb = new StringBuilder();
                Throwable unwrapException = LoggingUtils.unwrapException(th, sb);
                if (sb.length() > 0) {
                    TRACE_LOGGER.error(this.hostContext.withHost("Exception while initializing stores (" + sb.toString() + "), not starting partition manager"), unwrapException);
                } else {
                    TRACE_LOGGER.error(this.hostContext.withHost("Exception while initializing stores, not starting partition manager"), unwrapException);
                }
            }
        }, (Executor) this.hostContext.getExecutor()).thenRunAsync(() -> {
            synchronized (this.scanFutureSynchronizer) {
                this.scanFuture = this.hostContext.getExecutor().schedule(() -> {
                    return scan();
                }, 0L, TimeUnit.SECONDS);
            }
            onInitializeCompleteTestHook();
        }, (Executor) this.hostContext.getExecutor());
    }

    private CompletableFuture<?> initializeStores() {
        ILeaseManager leaseManager = this.hostContext.getLeaseManager();
        ICheckpointManager checkpointManager = this.hostContext.getCheckpointManager();
        CompletableFuture<?> buildRetries = buildRetries(buildRetries(CompletableFuture.completedFuture(null), () -> {
            return leaseManager.createLeaseStoreIfNotExists();
        }, null, "Failure creating lease store for this Event Hub, retrying", "Out of retries creating lease store for this Event Hub", EventProcessorHostActionStrings.CREATING_LEASE_STORE, 5), () -> {
            return checkpointManager.createCheckpointStoreIfNotExists();
        }, null, "Failure creating checkpoint store for this Event Hub, retrying", "Out of retries creating checkpoint store for this Event Hub", EventProcessorHostActionStrings.CREATING_CHECKPOINT_STORE, 5);
        for (String str : this.partitionIds) {
            buildRetries = buildRetries(buildRetries(buildRetries, () -> {
                return leaseManager.createLeaseIfNotExists(str);
            }, str, "Failure creating lease for partition, retrying", "Out of retries creating lease for partition", EventProcessorHostActionStrings.CREATING_LEASE, 5), () -> {
                return checkpointManager.createCheckpointIfNotExists(str);
            }, str, "Failure creating checkpoint for partition, retrying", "Out of retries creating checkpoint blob for partition", EventProcessorHostActionStrings.CREATING_CHECKPOINT, 5);
        }
        buildRetries.whenCompleteAsync((obj, th) -> {
            if (th != null && (th instanceof FinalException)) {
                throw ((FinalException) th).getInner();
            }
        }, (Executor) this.hostContext.getExecutor());
        return buildRetries;
    }

    private CompletableFuture<?> buildRetries(CompletableFuture<?> completableFuture, Callable<CompletableFuture<?>> callable, String str, String str2, String str3, String str4, int i) {
        CompletableFuture thenComposeAsync = completableFuture.thenComposeAsync(obj -> {
            CompletableFuture.completedFuture(null);
            try {
                return (CompletableFuture) callable.call();
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        }, (Executor) this.hostContext.getExecutor());
        for (int i2 = 1; i2 < i; i2++) {
            thenComposeAsync = thenComposeAsync.handleAsync((obj2, th) -> {
                Object obj2 = obj2;
                if (th != null) {
                    if (th instanceof FinalException) {
                        throw ((FinalException) th);
                    }
                    if (str != null) {
                        TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(str, str2), LoggingUtils.unwrapException(th, null));
                    } else {
                        TRACE_LOGGER.warn(this.hostContext.withHost(str2), LoggingUtils.unwrapException(th, null));
                    }
                } else if (obj2 == null) {
                    obj2 = true;
                }
                if (th == null) {
                    return obj2;
                }
                return null;
            }, (Executor) this.hostContext.getExecutor()).thenComposeAsync(obj3 -> {
                CompletableFuture completedFuture = CompletableFuture.completedFuture(obj3);
                if (obj3 == null) {
                    try {
                        completedFuture = (CompletableFuture) callable.call();
                    } catch (Exception e) {
                        throw new CompletionException(e);
                    }
                }
                return completedFuture;
            }, (Executor) this.hostContext.getExecutor());
        }
        return thenComposeAsync.handleAsync((obj4, th2) -> {
            if (th2 == null) {
                if (th2 == null) {
                    return obj4;
                }
                return null;
            }
            if (th2 instanceof FinalException) {
                throw ((FinalException) th2);
            }
            if (str != null) {
                TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(str, str3));
            } else {
                TRACE_LOGGER.warn(this.hostContext.withHost(str3));
            }
            throw new FinalException(LoggingUtils.wrapExceptionWithMessage(LoggingUtils.unwrapException(th2, null), str3, str4));
        }, (Executor) this.hostContext.getExecutor());
    }

    private Void scan() {
        TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan"));
        AtomicInteger atomicInteger = new AtomicInteger();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        BoolWrapper boolWrapper = new BoolWrapper(true);
        CompletableFuture thenApplyAsync = this.hostContext.getLeaseManager().getAllLeases().thenApplyAsync(list -> {
            ArrayList arrayList = new ArrayList();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Lease lease = (Lease) it.next();
                if (lease != null) {
                    arrayList.add(lease.isExpired().thenComposeAsync(bool -> {
                        return bool.booleanValue() ? this.hostContext.getLeaseManager().acquireLease(lease) : CompletableFuture.completedFuture(false);
                    }, (Executor) this.hostContext.getExecutor()).thenApplyAsync((Function<? super U, ? extends U>) bool2 -> {
                        if (bool2.booleanValue()) {
                            this.pump.addPump(lease);
                        }
                        if (lease.isOwnedBy(this.hostContext.getHostName())) {
                            atomicInteger.getAndIncrement();
                        } else {
                            concurrentHashMap.put(lease.getPartitionId(), lease);
                        }
                        return lease;
                    }, (Executor) this.hostContext.getExecutor()).whenCompleteAsync((lease2, th) -> {
                        if (th != null) {
                            boolWrapper.value = false;
                            Exception exc = (Exception) LoggingUtils.unwrapException(th, null);
                            TRACE_LOGGER.warn(this.hostContext.withHost("Failure getting/acquiring lease, skipping"), exc);
                            this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), exc, EventProcessorHostActionStrings.CHECKING_LEASES, ExceptionReceivedEventArgs.NO_ASSOCIATED_PARTITION);
                        }
                    }, (Executor) this.hostContext.getExecutor()));
                } else {
                    TRACE_LOGGER.warn(this.hostContext.withHost("null lease during scan"));
                }
            }
            return arrayList;
        }, (Executor) this.hostContext.getExecutor()).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) arrayList -> {
            CompletableFuture<Void> completedFuture;
            if (arrayList.size() > 0) {
                completedFuture = CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[arrayList.size()]));
            } else {
                TRACE_LOGGER.warn(this.hostContext.withHost("all leases were null during scan"));
                completedFuture = CompletableFuture.completedFuture(null);
            }
            return completedFuture;
        }, (Executor) this.hostContext.getExecutor()).thenApplyAsync(r8 -> {
            TRACE_LOGGER.debug(this.hostContext.withHost("Lease scan steal check"));
            Lease lease = null;
            if (concurrentHashMap.size() > 0 && boolWrapper.value) {
                lease = whichLeaseToSteal(concurrentHashMap.values(), atomicInteger.get());
            }
            return lease;
        }, (Executor) this.hostContext.getExecutor());
        thenApplyAsync.thenComposeAsync(lease -> {
            return lease != null ? this.hostContext.getLeaseManager().acquireLease(lease) : CompletableFuture.completedFuture(false);
        }, (Executor) this.hostContext.getExecutor()).thenCombineAsync((CompletionStage) thenApplyAsync, (bool, lease2) -> {
            if (bool.booleanValue()) {
                TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease2, "Stole lease"));
                this.pump.addPump(lease2);
            }
            return lease2;
        }, (Executor) this.hostContext.getExecutor()).whenCompleteAsync((lease3, th) -> {
            if (th != null) {
                Exception exc = (Exception) LoggingUtils.unwrapException(th, null);
                if (lease3 != null) {
                    TRACE_LOGGER.warn(this.hostContext.withHost("Exception stealing lease for partition " + lease3.getPartitionId()), exc);
                    this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), exc, EventProcessorHostActionStrings.STEALING_LEASE, lease3.getPartitionId());
                } else {
                    TRACE_LOGGER.warn(this.hostContext.withHost("Exception stealing lease"), exc);
                    this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), exc, EventProcessorHostActionStrings.STEALING_LEASE, ExceptionReceivedEventArgs.NO_ASSOCIATED_PARTITION);
                }
            }
            onPartitionCheckCompleteTestHook();
            synchronized (this.scanFutureSynchronizer) {
                if (!this.scanFuture.isCancelled()) {
                    int leaseRenewIntervalInSeconds = this.hostContext.getPartitionManagerOptions().getLeaseRenewIntervalInSeconds();
                    this.scanFuture = this.hostContext.getExecutor().schedule(() -> {
                        return scan();
                    }, leaseRenewIntervalInSeconds, TimeUnit.SECONDS);
                    TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + leaseRenewIntervalInSeconds));
                }
            }
        }, (Executor) this.hostContext.getExecutor());
        return null;
    }

    private Lease whichLeaseToSteal(Collection<Lease> collection, int i) {
        HashMap<String, Integer> countLeasesByOwner = countLeasesByOwner(collection);
        String findBiggestOwner = findBiggestOwner(countLeasesByOwner);
        Lease lease = null;
        if (countLeasesByOwner.get(findBiggestOwner).intValue() - i >= 2) {
            Iterator<Lease> it = collection.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Lease next = it.next();
                if (next.isOwnedBy(findBiggestOwner)) {
                    lease = next;
                    TRACE_LOGGER.debug(this.hostContext.withHost("Proposed to steal lease for partition " + next.getPartitionId() + " from " + findBiggestOwner));
                    break;
                }
            }
        }
        return lease;
    }

    private String findBiggestOwner(HashMap<String, Integer> hashMap) {
        int i = 0;
        String str = null;
        for (String str2 : hashMap.keySet()) {
            if (hashMap.get(str2).intValue() > i) {
                i = hashMap.get(str2).intValue();
                str = str2;
            }
        }
        return str;
    }

    private HashMap<String, Integer> countLeasesByOwner(Iterable<Lease> iterable) {
        HashMap<String, Integer> hashMap = new HashMap<>();
        for (Lease lease : iterable) {
            if (hashMap.containsKey(lease.getOwner())) {
                hashMap.put(lease.getOwner(), Integer.valueOf(hashMap.get(lease.getOwner()).intValue() + 1));
            } else {
                hashMap.put(lease.getOwner(), 1);
            }
        }
        for (String str : hashMap.keySet()) {
            TRACE_LOGGER.debug(this.hostContext.withHost("host " + str + " owns " + hashMap.get(str) + " leases"));
        }
        TRACE_LOGGER.debug(this.hostContext.withHost("total hosts in sorted list: " + hashMap.size()));
        return hashMap;
    }
}
