package org.apache.druid.indexing.kafka.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
import org.apache.druid.indexing.kafka.KafkaIOConfig;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClient;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.kafka.KafkaPartitions;
import org.apache.druid.indexing.kafka.KafkaTuningConfig;
import org.apache.druid.indexing.kafka.supervisor.TaskReportData;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.DateTime;
import org.joda.time.ReadableDuration;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.class */
public class KafkaSupervisor implements Supervisor {
    private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
    private static final long NOT_SET = -1;
    private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
    private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
    private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
    private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000;
    private static final int MAX_INITIALIZATION_RETRIES = 20;
    public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
    private final TaskStorage taskStorage;
    private final TaskMaster taskMaster;
    private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
    private final KafkaIndexTaskClient taskClient;
    private final ObjectMapper sortingMapper;
    private final KafkaSupervisorSpec spec;
    private final ServiceEmitter emitter;
    private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
    private final String dataSource;
    private final KafkaSupervisorIOConfig ioConfig;
    private final KafkaSupervisorTuningConfig tuningConfig;
    private final KafkaTuningConfig taskTuningConfig;
    private final String supervisorId;
    private final TaskInfoProvider taskInfoProvider;
    private final long futureTimeoutInSeconds;
    private final RowIngestionMetersFactory rowIngestionMetersFactory;
    private final ExecutorService exec;
    private final ScheduledExecutorService scheduledExec;
    private final ScheduledExecutorService reportingExec;
    private final ListeningExecutorService workerExec;
    private long lastRunTime;
    private volatile DateTime firstRunTime;
    private volatile KafkaConsumer consumer;
    private volatile Map<Integer, Long> latestOffsetsFromKafka;
    private volatile DateTime offsetsLastUpdated;
    private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
    private static final CopyOnWriteArrayList EMPTY_LIST = Lists.newCopyOnWriteArrayList();
    private final ConcurrentHashMap<Integer, TaskGroup> taskGroups = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> pendingCompletionTaskGroups = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, Long>> partitionGroups = new ConcurrentHashMap<>();
    private final BlockingQueue<Notice> notices = new LinkedBlockingDeque();
    private final Object stopLock = new Object();
    private final Object stateChangeLock = new Object();
    private final Object consumerLock = new Object();
    private boolean listenerRegistered = false;
    private int initRetryCounter = 0;
    private volatile boolean lifecycleStarted = false;
    private volatile boolean started = false;
    private volatile boolean stopped = false;

    /* loaded from: input_file:org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor$CheckpointNotice.class */
    private class CheckpointNotice implements Notice {

        @Nullable
        private final Integer nullableTaskGroupId;

        @Deprecated
        private final String baseSequenceName;
        private final KafkaDataSourceMetadata previousCheckpoint;
        private final KafkaDataSourceMetadata currentCheckpoint;

        CheckpointNotice(@Nullable Integer num, @Deprecated String str, KafkaDataSourceMetadata kafkaDataSourceMetadata, KafkaDataSourceMetadata kafkaDataSourceMetadata2) {
            this.baseSequenceName = str;
            this.nullableTaskGroupId = num;
            this.previousCheckpoint = kafkaDataSourceMetadata;
            this.currentCheckpoint = kafkaDataSourceMetadata2;
        }

        @Override // org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.Notice
        public void handle() throws ExecutionException, InterruptedException {
            int intValue;
            if (this.nullableTaskGroupId == null) {
                Optional map = KafkaSupervisor.this.taskGroups.entrySet().stream().filter(entry -> {
                    return ((TaskGroup) entry.getValue()).baseSequenceName.equals(this.baseSequenceName);
                }).findAny().map((v0) -> {
                    return v0.getKey();
                });
                intValue = map.isPresent() ? ((Integer) map.get()).intValue() : ((Integer) ((Map.Entry) KafkaSupervisor.this.pendingCompletionTaskGroups.entrySet().stream().filter(entry2 -> {
                    return ((List) entry2.getValue()).stream().anyMatch(taskGroup -> {
                        return taskGroup.baseSequenceName.equals(this.baseSequenceName);
                    });
                }).findAny().orElseThrow(() -> {
                    return new ISE("Cannot find taskGroup for baseSequenceName[%s]", new Object[]{this.baseSequenceName});
                })).getKey()).intValue();
            } else {
                intValue = this.nullableTaskGroupId.intValue();
            }
            TaskGroup taskGroup = (TaskGroup) KafkaSupervisor.this.taskGroups.get(Integer.valueOf(intValue));
            if (isValidTaskGroup(intValue, taskGroup)) {
                TreeMap<Integer, Map<Integer, Long>> treeMap = taskGroup.sequenceOffsets;
                int size = treeMap.size();
                Iterator<Integer> it = treeMap.descendingKeySet().iterator();
                while (it.hasNext() && !treeMap.get(Integer.valueOf(it.next().intValue())).equals(this.previousCheckpoint.getKafkaPartitions().getPartitionOffsetMap())) {
                    size--;
                }
                if (size == 0) {
                    throw new ISE("No such previous checkpoint [%s] found", new Object[]{this.previousCheckpoint});
                }
                if (size < treeMap.size()) {
                    Preconditions.checkState(size == treeMap.size() - 1, "checkpoint consistency failure");
                    KafkaSupervisor.log.info("Already checkpointed with offsets [%s]", new Object[]{treeMap.lastEntry().getValue()});
                } else {
                    Map<Integer, Long> map2 = (Map) KafkaSupervisor.this.checkpointTaskGroup(taskGroup, false).get();
                    taskGroup.addNewCheckpoint(map2);
                    KafkaSupervisor.log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", new Object[]{map2, Integer.valueOf(intValue)});
                }
            }
        }

        private boolean isValidTaskGroup(int i, @Nullable TaskGroup taskGroup) {
            if (taskGroup != null) {
                return true;
            }
            if (KafkaSupervisor.this.pendingCompletionTaskGroups.containsKey(Integer.valueOf(i))) {
                KafkaSupervisor.log.warn("Ignoring checkpoint request because taskGroup[%d] has already stopped indexing and is waiting for publishing segments", new Object[]{Integer.valueOf(i)});
                return false;
            }
            if (!KafkaSupervisor.this.partitionGroups.containsKey(Integer.valueOf(i))) {
                throw new ISE("WTH?! cannot find taskGroup [%s] among all taskGroups [%s]", new Object[]{Integer.valueOf(i), KafkaSupervisor.this.taskGroups});
            }
            KafkaSupervisor.log.warn("Ignoring checkpoint request because taskGroup[%d] is inactive", new Object[]{Integer.valueOf(i)});
            return false;
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor$GracefulShutdownNotice.class */
    private class GracefulShutdownNotice extends ShutdownNotice {
        private GracefulShutdownNotice() {
            super();
        }

        @Override // org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.ShutdownNotice, org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.Notice
        public void handle() throws InterruptedException, ExecutionException, TimeoutException {
            KafkaSupervisor.this.gracefulShutdownInternal();
            super.handle();
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor$Notice.class */
    private interface Notice {
        void handle() throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException;
    }

    /* loaded from: input_file:org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor$ResetNotice.class */
    private class ResetNotice implements Notice {
        final DataSourceMetadata dataSourceMetadata;

        ResetNotice(DataSourceMetadata dataSourceMetadata) {
            this.dataSourceMetadata = dataSourceMetadata;
        }

        @Override // org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.Notice
        public void handle() {
            KafkaSupervisor.this.resetInternal(this.dataSourceMetadata);
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor$RunNotice.class */
    private class RunNotice implements Notice {
        private RunNotice() {
        }

        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.access$602(org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        @Override // org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.Notice
        public void handle() throws java.util.concurrent.ExecutionException, java.lang.InterruptedException, java.util.concurrent.TimeoutException, com.fasterxml.jackson.core.JsonProcessingException {
            /*
                r5 = this;
                long r0 = java.lang.System.currentTimeMillis()
                r6 = r0
                r0 = r6
                r1 = r5
                org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor r1 = org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.this
                long r1 = org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.access$600(r1)
                long r0 = r0 - r1
                r1 = 1000(0x3e8, double:4.94E-321)
                int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                if (r0 >= 0) goto L15
                return
            L15:
                r0 = r5
                org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor r0 = org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.this
                r1 = r6
                long r0 = org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.access$602(r0, r1)
                r0 = r5
                org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor r0 = org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.this
                r0.runInternal()
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.RunNotice.handle():void");
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor$ShutdownNotice.class */
    private class ShutdownNotice implements Notice {
        private ShutdownNotice() {
        }

        @Override // org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.Notice
        public void handle() throws InterruptedException, ExecutionException, TimeoutException {
            KafkaSupervisor.this.consumer.close();
            synchronized (KafkaSupervisor.this.stopLock) {
                KafkaSupervisor.this.stopped = true;
                KafkaSupervisor.this.stopLock.notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor$StatsFromTaskResult.class */
    public static class StatsFromTaskResult {
        private final String groupId;
        private final String taskId;
        private final Map<String, Object> stats;

        public StatsFromTaskResult(int i, String str, Map<String, Object> map) {
            this.groupId = String.valueOf(i);
            this.taskId = str;
            this.stats = map;
        }

        public String getGroupId() {
            return this.groupId;
        }

        public String getTaskId() {
            return this.taskId;
        }

        public Map<String, Object> getStats() {
            return this.stats;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor$TaskData.class */
    public static class TaskData {

        @Nullable
        volatile TaskStatus status;

        @Nullable
        volatile DateTime startTime;
        volatile Map<Integer, Long> currentOffsets;

        private TaskData() {
            this.currentOffsets = new HashMap();
        }

        public String toString() {
            return "TaskData{status=" + this.status + ", startTime=" + this.startTime + ", currentOffsets=" + this.currentOffsets + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor$TaskGroup.class */
    public class TaskGroup {
        final int groupId;
        final ImmutableMap<Integer, Long> partitionOffsets;
        final com.google.common.base.Optional<DateTime> minimumMessageTime;
        final com.google.common.base.Optional<DateTime> maximumMessageTime;
        DateTime completionTimeout;
        final String baseSequenceName;
        final ConcurrentHashMap<String, TaskData> tasks = new ConcurrentHashMap<>();
        final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap<>();

        TaskGroup(int i, ImmutableMap<Integer, Long> immutableMap, com.google.common.base.Optional<DateTime> optional, com.google.common.base.Optional<DateTime> optional2) {
            this.groupId = i;
            this.partitionOffsets = immutableMap;
            this.minimumMessageTime = optional;
            this.maximumMessageTime = optional2;
            this.sequenceOffsets.put(0, immutableMap);
            this.baseSequenceName = KafkaSupervisor.this.generateSequenceName(immutableMap, optional, optional2);
        }

        int addNewCheckpoint(Map<Integer, Long> map) {
            this.sequenceOffsets.put(Integer.valueOf(this.sequenceOffsets.lastKey().intValue() + 1), map);
            return this.sequenceOffsets.lastKey().intValue();
        }

        Set<String> taskIds() {
            return this.tasks.keySet();
        }
    }

    public KafkaSupervisor(final TaskStorage taskStorage, final TaskMaster taskMaster, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, KafkaIndexTaskClientFactory kafkaIndexTaskClientFactory, ObjectMapper objectMapper, KafkaSupervisorSpec kafkaSupervisorSpec, RowIngestionMetersFactory rowIngestionMetersFactory) {
        this.taskStorage = taskStorage;
        this.taskMaster = taskMaster;
        this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
        this.sortingMapper = objectMapper.copy().configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
        this.spec = kafkaSupervisorSpec;
        this.emitter = kafkaSupervisorSpec.getEmitter();
        this.monitorSchedulerConfig = kafkaSupervisorSpec.getMonitorSchedulerConfig();
        this.rowIngestionMetersFactory = rowIngestionMetersFactory;
        this.dataSource = kafkaSupervisorSpec.getDataSchema().getDataSource();
        this.ioConfig = kafkaSupervisorSpec.getIoConfig();
        this.tuningConfig = kafkaSupervisorSpec.getTuningConfig();
        this.taskTuningConfig = KafkaTuningConfig.copyOf(this.tuningConfig);
        this.supervisorId = StringUtils.format("KafkaSupervisor-%s", new Object[]{this.dataSource});
        this.exec = Execs.singleThreaded(this.supervisorId);
        this.scheduledExec = Execs.scheduledSingleThreaded(this.supervisorId + "-Scheduler-%d");
        this.reportingExec = Execs.scheduledSingleThreaded(this.supervisorId + "-Reporting-%d");
        int intValue = this.tuningConfig.getWorkerThreads() != null ? this.tuningConfig.getWorkerThreads().intValue() : Math.min(10, this.ioConfig.getTaskCount().intValue());
        this.workerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(intValue, this.supervisorId + "-Worker-%d"));
        log.info("Created worker pool with [%d] threads for dataSource [%s]", new Object[]{Integer.valueOf(intValue), this.dataSource});
        this.taskInfoProvider = new TaskInfoProvider() { // from class: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.1
            public TaskLocation getTaskLocation(String str) {
                Preconditions.checkNotNull(str, "id");
                com.google.common.base.Optional taskRunner = taskMaster.getTaskRunner();
                if (taskRunner.isPresent()) {
                    com.google.common.base.Optional tryFind = Iterables.tryFind(((TaskRunner) taskRunner.get()).getRunningTasks(), taskRunnerWorkItem -> {
                        return str.equals(taskRunnerWorkItem.getTaskId());
                    });
                    if (tryFind.isPresent()) {
                        return ((TaskRunnerWorkItem) tryFind.get()).getLocation();
                    }
                } else {
                    KafkaSupervisor.log.error("Failed to get task runner because I'm not the leader!", new Object[0]);
                }
                return TaskLocation.unknown();
            }

            public com.google.common.base.Optional<TaskStatus> getTaskStatus(String str) {
                return taskStorage.getStatus(str);
            }
        };
        this.futureTimeoutInSeconds = Math.max(MINIMUM_FUTURE_TIMEOUT_IN_SECONDS, this.tuningConfig.getChatRetries().longValue() * (this.tuningConfig.getHttpTimeout().getStandardSeconds() + 10));
        int intValue2 = this.tuningConfig.getChatThreads() != null ? this.tuningConfig.getChatThreads().intValue() : Math.min(10, this.ioConfig.getTaskCount().intValue() * this.ioConfig.getReplicas().intValue());
        this.taskClient = kafkaIndexTaskClientFactory.m4build(this.taskInfoProvider, this.dataSource, intValue2, this.tuningConfig.getHttpTimeout(), this.tuningConfig.getChatRetries().longValue());
        log.info("Created taskClient with dataSource[%s] chatThreads[%d] httpTimeout[%s] chatRetries[%d]", new Object[]{this.dataSource, Integer.valueOf(intValue2), this.tuningConfig.getHttpTimeout(), this.tuningConfig.getChatRetries()});
    }

    public void start() {
        synchronized (this.stateChangeLock) {
            Preconditions.checkState(!this.lifecycleStarted, "already started");
            Preconditions.checkState(!this.exec.isShutdown(), "already stopped");
            try {
                tryInit();
            } catch (Exception e) {
                if (!this.started) {
                    log.warn("First initialization attempt failed for KafkaSupervisor[%s], starting retries...", new Object[]{this.dataSource});
                    this.exec.submit(() -> {
                        try {
                            RetryUtils.retry(() -> {
                                tryInit();
                                return 0;
                            }, th -> {
                                return !this.started;
                            }, 0, MAX_INITIALIZATION_RETRIES, (RetryUtils.CleanupAfterFailure) null, (String) null);
                        } catch (Exception e2) {
                            log.makeAlert("Failed to initialize after %s retries, aborting. Please resubmit the supervisor spec to restart this supervisor [%s]", new Object[]{Integer.valueOf(MAX_INITIALIZATION_RETRIES), this.supervisorId}).emit();
                            throw new RuntimeException(e2);
                        }
                    });
                }
            }
            this.lifecycleStarted = true;
        }
    }

    public void stop(boolean z) {
        synchronized (this.stateChangeLock) {
            Preconditions.checkState(this.lifecycleStarted, "lifecycle not started");
            log.info("Beginning shutdown of KafkaSupervisor[%s]", new Object[]{this.dataSource});
            try {
                this.scheduledExec.shutdownNow();
                this.reportingExec.shutdownNow();
                if (this.started) {
                    com.google.common.base.Optional taskRunner = this.taskMaster.getTaskRunner();
                    if (taskRunner.isPresent()) {
                        ((TaskRunner) taskRunner.get()).unregisterListener(this.supervisorId);
                    }
                    synchronized (this.stopLock) {
                        if (z) {
                            log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish", new Object[0]);
                            this.notices.add(new GracefulShutdownNotice());
                        } else {
                            log.info("Posting ShutdownNotice", new Object[0]);
                            this.notices.add(new ShutdownNotice());
                        }
                        long millis = this.tuningConfig.getShutdownTimeout().getMillis();
                        long currentTimeMillis = System.currentTimeMillis() + millis;
                        while (true) {
                            if (this.stopped) {
                                break;
                            }
                            long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                            if (currentTimeMillis2 <= 0) {
                                log.info("Timed out while waiting for shutdown (timeout [%,dms])", new Object[]{Long.valueOf(millis)});
                                this.stopped = true;
                                break;
                            }
                            this.stopLock.wait(currentTimeMillis2);
                        }
                    }
                    log.info("Shutdown notice handled", new Object[0]);
                }
                this.taskClient.close();
                this.workerExec.shutdownNow();
                this.exec.shutdownNow();
                this.started = false;
                log.info("KafkaSupervisor[%s] has stopped", new Object[]{this.dataSource});
            } catch (Exception e) {
                log.makeAlert(e, "Exception stopping KafkaSupervisor[%s]", new Object[]{this.dataSource}).emit();
            }
        }
    }

    public SupervisorReport getStatus() {
        return generateReport(true);
    }

    public Map<String, Map<String, Object>> getStats() {
        try {
            return getCurrentTotalStats();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error(e, "getStats() interrupted.", new Object[0]);
            throw new RuntimeException(e);
        } catch (ExecutionException | TimeoutException e2) {
            throw new RuntimeException(e2);
        }
    }

    public void reset(DataSourceMetadata dataSourceMetadata) {
        log.info("Posting ResetNotice", new Object[0]);
        this.notices.add(new ResetNotice(dataSourceMetadata));
    }

    public void checkpoint(@Nullable Integer num, @Deprecated String str, DataSourceMetadata dataSourceMetadata, DataSourceMetadata dataSourceMetadata2) {
        Preconditions.checkNotNull(dataSourceMetadata, "previousCheckpoint");
        Preconditions.checkNotNull(dataSourceMetadata2, "current checkpoint cannot be null");
        Preconditions.checkArgument(this.ioConfig.getTopic().equals(((KafkaDataSourceMetadata) dataSourceMetadata2).getKafkaPartitions().getTopic()), "Supervisor topic [%s] and topic in checkpoint [%s] does not match", new Object[]{this.ioConfig.getTopic(), ((KafkaDataSourceMetadata) dataSourceMetadata2).getKafkaPartitions().getTopic()});
        log.info("Checkpointing [%s] for taskGroup [%s]", new Object[]{dataSourceMetadata2, num});
        this.notices.add(new CheckpointNotice(num, str, (KafkaDataSourceMetadata) dataSourceMetadata, (KafkaDataSourceMetadata) dataSourceMetadata2));
    }

    public void possiblyRegisterListener() {
        if (this.listenerRegistered) {
            return;
        }
        com.google.common.base.Optional taskRunner = this.taskMaster.getTaskRunner();
        if (taskRunner.isPresent()) {
            ((TaskRunner) taskRunner.get()).registerListener(new TaskRunnerListener() { // from class: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.2
                public String getListenerId() {
                    return KafkaSupervisor.this.supervisorId;
                }

                public void locationChanged(String str, TaskLocation taskLocation) {
                }

                public void statusChanged(String str, TaskStatus taskStatus) {
                    KafkaSupervisor.this.notices.add(new RunNotice());
                }
            }, MoreExecutors.sameThreadExecutor());
            this.listenerRegistered = true;
        }
    }

    @VisibleForTesting
    void resetInternal(DataSourceMetadata dataSourceMetadata) {
        if (dataSourceMetadata == null) {
            log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", new Object[]{this.dataSource, Boolean.valueOf(this.indexerMetadataStorageCoordinator.deleteDataSourceMetadata(this.dataSource))});
            this.taskGroups.values().forEach(this::killTasksInGroup);
            this.taskGroups.clear();
            this.partitionGroups.clear();
            return;
        }
        if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
            throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", new Object[]{dataSourceMetadata.getClass()});
        }
        KafkaDataSourceMetadata kafkaDataSourceMetadata = (KafkaDataSourceMetadata) dataSourceMetadata;
        if (!kafkaDataSourceMetadata.getKafkaPartitions().getTopic().equals(this.ioConfig.getTopic())) {
            log.warn("Reset metadata topic [%s] and supervisor's topic [%s] do not match", new Object[]{kafkaDataSourceMetadata.getKafkaPartitions().getTopic(), this.ioConfig.getTopic()});
            return;
        }
        DataSourceMetadata dataSourceMetadata2 = this.indexerMetadataStorageCoordinator.getDataSourceMetadata(this.dataSource);
        if (dataSourceMetadata2 != null && !(dataSourceMetadata2 instanceof KafkaDataSourceMetadata)) {
            throw new IAE("Expected KafkaDataSourceMetadata from metadata store but found instance of [%s]", new Object[]{dataSourceMetadata2.getClass()});
        }
        KafkaDataSourceMetadata kafkaDataSourceMetadata2 = (KafkaDataSourceMetadata) dataSourceMetadata2;
        boolean z = false;
        for (Map.Entry<Integer, Long> entry : kafkaDataSourceMetadata.getKafkaPartitions().getPartitionOffsetMap().entrySet()) {
            Long l = kafkaDataSourceMetadata2 == null ? null : kafkaDataSourceMetadata2.getKafkaPartitions().getPartitionOffsetMap().get(entry.getKey());
            TaskGroup taskGroup = this.taskGroups.get(Integer.valueOf(getTaskGroupIdForPartition(entry.getKey().intValue())));
            boolean z2 = taskGroup != null && ((Long) taskGroup.partitionOffsets.get(entry.getKey())).equals(entry.getValue());
            if (l != null || z2) {
                z = true;
                break;
            }
        }
        if (!z) {
            log.info("Ignoring duplicate reset request [%s]", new Object[]{dataSourceMetadata});
            return;
        }
        boolean z3 = false;
        if (kafkaDataSourceMetadata2 == null) {
            z3 = true;
        } else {
            try {
                z3 = this.indexerMetadataStorageCoordinator.resetDataSourceMetadata(this.dataSource, kafkaDataSourceMetadata2.minus(kafkaDataSourceMetadata));
            } catch (IOException e) {
                log.error("Resetting DataSourceMetadata failed [%s]", new Object[]{e.getMessage()});
                Throwables.propagate(e);
            }
        }
        if (!z3) {
            throw new ISE("Unable to reset metadata", new Object[0]);
        }
        kafkaDataSourceMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(num -> {
            int taskGroupIdForPartition = getTaskGroupIdForPartition(num.intValue());
            killTaskGroupForPartitions(ImmutableSet.of(num));
            this.taskGroups.remove(Integer.valueOf(taskGroupIdForPartition));
            this.partitionGroups.get(Integer.valueOf(taskGroupIdForPartition)).replaceAll((num, l2) -> {
                return Long.valueOf(NOT_SET);
            });
        });
    }

    private void killTaskGroupForPartitions(Set<Integer> set) {
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            killTasksInGroup(this.taskGroups.get(Integer.valueOf(getTaskGroupIdForPartition(it.next().intValue()))));
        }
    }

    private void killTasksInGroup(TaskGroup taskGroup) {
        if (taskGroup != null) {
            Iterator it = taskGroup.tasks.keySet().iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                log.info("Killing task [%s] in the task group", new Object[]{str});
                killTask(str);
            }
        }
    }

    @VisibleForTesting
    void gracefulShutdownInternal() throws ExecutionException, InterruptedException, TimeoutException {
        Iterator<TaskGroup> it = this.taskGroups.values().iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, TaskData> entry : it.next().tasks.entrySet()) {
                if (this.taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown())) {
                    killTask(entry.getKey());
                } else {
                    entry.getValue().startTime = DateTimes.EPOCH;
                }
            }
        }
        checkTaskDuration();
    }

    @VisibleForTesting
    void runInternal() throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException {
        possiblyRegisterListener();
        updatePartitionDataFromKafka();
        discoverTasks();
        updateTaskStatus();
        checkTaskDuration();
        checkPendingCompletionTasks();
        checkCurrentTaskState();
        if (this.spec.isSuspended()) {
            log.info("[%s] supervisor is suspended.", new Object[]{this.dataSource});
            gracefulShutdownInternal();
        } else {
            log.info("[%s] supervisor is running.", new Object[]{this.dataSource});
            createNewTasks();
        }
        if (log.isDebugEnabled()) {
            log.debug(generateReport(true).toString(), new Object[0]);
        } else {
            log.info(generateReport(false).toString(), new Object[0]);
        }
    }

    String generateSequenceName(Map<Integer, Long> map, com.google.common.base.Optional<DateTime> optional, com.google.common.base.Optional<DateTime> optional2) {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            sb.append(StringUtils.format("+%d(%d)", new Object[]{entry.getKey(), entry.getValue()}));
        }
        try {
            return Joiner.on("_").join("index_kafka", this.dataSource, new Object[]{DigestUtils.sha1Hex(this.sortingMapper.writeValueAsString(this.spec.getDataSchema()) + this.sortingMapper.writeValueAsString(this.taskTuningConfig) + sb.toString().substring(1) + (optional.isPresent() ? String.valueOf(((DateTime) optional.get()).getMillis()) : "") + (optional2.isPresent() ? String.valueOf(((DateTime) optional2.get()).getMillis()) : "")).substring(0, 15)});
        } catch (JsonProcessingException e) {
            throw Throwables.propagate(e);
        }
    }

    @VisibleForTesting
    protected void tryInit() {
        synchronized (this.stateChangeLock) {
            if (this.started) {
                log.warn("SUpervisor was already started, skipping init", new Object[0]);
                return;
            }
            if (this.stopped) {
                log.warn("Supervisor was already stopped, skipping init.", new Object[0]);
                return;
            }
            try {
                this.consumer = getKafkaConsumer();
                this.exec.submit(() -> {
                    try {
                        long max = Math.max(this.ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS);
                        while (!Thread.currentThread().isInterrupted() && !this.stopped) {
                            Notice poll = this.notices.poll(max, TimeUnit.MILLISECONDS);
                            if (poll != null) {
                                try {
                                    poll.handle();
                                } catch (Throwable th) {
                                    log.makeAlert(th, "KafkaSupervisor[%s] failed to handle notice", new Object[]{this.dataSource}).addData("noticeClass", poll.getClass().getSimpleName()).emit();
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        log.info("KafkaSupervisor[%s] interrupted, exiting", new Object[]{this.dataSource});
                    }
                });
                this.firstRunTime = DateTimes.nowUtc().plus(this.ioConfig.getStartDelay());
                this.scheduledExec.scheduleAtFixedRate(buildRunTask(), this.ioConfig.getStartDelay().getMillis(), Math.max(this.ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS), TimeUnit.MILLISECONDS);
                this.reportingExec.scheduleAtFixedRate(updateCurrentAndLatestOffsets(), this.ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, Math.max(this.tuningConfig.getOffsetFetchPeriod().getMillis(), MINIMUM_GET_OFFSET_PERIOD_MILLIS), TimeUnit.MILLISECONDS);
                this.reportingExec.scheduleAtFixedRate(emitLag(), this.ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, this.monitorSchedulerConfig.getEmitterPeriod().getMillis(), TimeUnit.MILLISECONDS);
                this.started = true;
                log.info("Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]", new Object[]{this.dataSource, this.ioConfig.getStartDelay(), this.spec.toString()});
            } catch (Exception e) {
                if (this.consumer != null) {
                    this.consumer.close();
                }
                this.initRetryCounter++;
                log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", new Object[]{this.dataSource}).emit();
                throw new RuntimeException(e);
            }
        }
    }

    private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
        Properties properties = new Properties();
        properties.setProperty("metadata.max.age.ms", "10000");
        properties.setProperty("group.id", StringUtils.format("kafka-supervisor-%s", new Object[]{RealtimeIndexTask.makeRandomId()}));
        KafkaIndexTask.addConsumerPropertiesFromConfig(properties, this.sortingMapper, this.ioConfig.getConsumerProperties());
        properties.setProperty("enable.auto.commit", "false");
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaConsumer;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private void updatePartitionDataFromKafka() {
        Map listTopics;
        try {
            synchronized (this.consumerLock) {
                listTopics = this.consumer.listTopics();
            }
            List list = (List) listTopics.get(this.ioConfig.getTopic());
            if (list == null) {
                log.warn("No such topic [%s] found, list of discovered topics [%s]", new Object[]{this.ioConfig.getTopic(), listTopics.keySet()});
            }
            int size = list != null ? list.size() : 0;
            log.debug("Found [%d] Kafka partitions for topic [%s]", new Object[]{Integer.valueOf(size), this.ioConfig.getTopic()});
            for (int i = 0; i < size; i++) {
                int taskGroupIdForPartition = getTaskGroupIdForPartition(i);
                if (this.partitionGroups.computeIfAbsent(Integer.valueOf(taskGroupIdForPartition), num -> {
                    return new ConcurrentHashMap();
                }).putIfAbsent(Integer.valueOf(i), Long.valueOf(NOT_SET)) == null) {
                    log.info("New partition [%d] discovered for topic [%s], added to task group [%d]", new Object[]{Integer.valueOf(i), this.ioConfig.getTopic(), Integer.valueOf(taskGroupIdForPartition)});
                }
            }
        } catch (Exception e) {
            log.warn(e, "Unable to get partition data from Kafka for brokers [%s], are the brokers up?", new Object[]{this.ioConfig.getConsumerProperties().get(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY)});
        }
    }

    private void discoverTasks() throws ExecutionException, InterruptedException, TimeoutException {
        int i = 0;
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        List<KafkaIndexTask> activeTasks = this.taskStorage.getActiveTasks();
        final HashMap hashMap = new HashMap();
        for (KafkaIndexTask kafkaIndexTask : activeTasks) {
            if ((kafkaIndexTask instanceof KafkaIndexTask) && this.dataSource.equals(kafkaIndexTask.getDataSource())) {
                i++;
                final KafkaIndexTask kafkaIndexTask2 = kafkaIndexTask;
                final String id = kafkaIndexTask.getId();
                Iterator<Integer> it = kafkaIndexTask2.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet().iterator();
                final Integer valueOf = it.hasNext() ? Integer.valueOf(getTaskGroupIdForPartition(it.next().intValue())) : null;
                if (valueOf != null) {
                    TaskGroup taskGroup = this.taskGroups.get(valueOf);
                    if (!isTaskInPendingCompletionGroups(id) && (taskGroup == null || !taskGroup.tasks.containsKey(id))) {
                        newArrayList.add(id);
                        newArrayList2.add(Futures.transform(this.taskClient.getStatusAsync(id), new Function<KafkaIndexTask.Status, Boolean>() { // from class: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.3
                            /* JADX WARN: Code restructure failed: missing block: B:30:0x0127, code lost:
                            
                                org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.log.warn("Stopping task [%s] which does not match the expected partition allocation", new java.lang.Object[]{r5});
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:32:0x013a, code lost:
                            
                                r8.this$0.stopTask(r5, false).get(r8.this$0.futureTimeoutInSeconds, java.util.concurrent.TimeUnit.SECONDS);
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:36:0x0159, code lost:
                            
                                r12 = move-exception;
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:37:0x015b, code lost:
                            
                                org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.log.warn(r12, "Exception while stopping task", new java.lang.Object[0]);
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:40:0x0183, code lost:
                            
                                if (r8.this$0.isTaskCurrent(r7.intValue(), r5) != false) goto L37;
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:41:0x0186, code lost:
                            
                                org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.log.info("Stopping task [%s] which does not match the expected parameters and ingestion spec", new java.lang.Object[]{r5});
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:43:0x0199, code lost:
                            
                                r8.this$0.stopTask(r5, false).get(r8.this$0.futureTimeoutInSeconds, java.util.concurrent.TimeUnit.SECONDS);
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:47:0x01b8, code lost:
                            
                                r10 = move-exception;
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:48:0x01b9, code lost:
                            
                                org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.log.warn(r10, "Exception while stopping task", new java.lang.Object[0]);
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:49:0x01cb, code lost:
                            
                                r0 = r8.this$0.taskGroups;
                                r1 = r7;
                                r3 = r7;
                                r4 = r6;
                                r0 = (org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.TaskGroup) r0.computeIfAbsent(r1, (v3) -> { // java.util.function.Function.apply(java.lang.Object):java.lang.Object
                                    return lambda$apply$1(r3, r4, v3);
                                });
                                r8.put(r7, r0);
                                r0 = r0.tasks.putIfAbsent(r5, new org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.TaskData(null));
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:50:0x0212, code lost:
                            
                                if (r0 == null) goto L41;
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:52:0x022d, code lost:
                            
                                throw new org.apache.druid.java.util.common.ISE("WTH? a taskData[%s] already exists for new task[%s]", new java.lang.Object[]{r0, r5});
                             */
                            /*
                                Code decompiled incorrectly, please refer to instructions dump.
                                To view partially-correct add '--show-bad-code' argument
                            */
                            public java.lang.Boolean apply(org.apache.druid.indexing.kafka.KafkaIndexTask.Status r9) {
                                /*
                                    Method dump skipped, instructions count: 586
                                    To view this dump add '--comments-level debug' option
                                */
                                throw new UnsupportedOperationException("Method not decompiled: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.AnonymousClass3.apply(org.apache.druid.indexing.kafka.KafkaIndexTask$Status):java.lang.Boolean");
                            }
                        }, this.workerExec));
                    }
                }
            }
        }
        List list = (List) Futures.successfulAsList(newArrayList2).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        for (int i2 = 0; i2 < list.size(); i2++) {
            if (list.get(i2) == null) {
                String str = (String) newArrayList.get(i2);
                log.warn("Task [%s] failed to return status, killing task", new Object[]{str});
                killTask(str);
            }
        }
        log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", new Object[]{Integer.valueOf(i), this.dataSource});
        verifyAndMergeCheckpoints(hashMap.values());
    }

    private void verifyAndMergeCheckpoints(Collection<TaskGroup> collection) {
        ArrayList arrayList = new ArrayList();
        for (TaskGroup taskGroup : collection) {
            arrayList.add(this.workerExec.submit(() -> {
                verifyAndMergeCheckpoints(taskGroup);
            }));
        }
        try {
            Futures.allAsList(arrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    private void verifyAndMergeCheckpoints(TaskGroup taskGroup) {
        int i = taskGroup.groupId;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (String str : taskGroup.taskIds()) {
            ListenableFuture<TreeMap<Integer, Map<Integer, Long>>> checkpointsAsync = this.taskClient.getCheckpointsAsync(str, true);
            arrayList3.add(str);
            arrayList2.add(checkpointsAsync);
        }
        try {
            List list = (List) Futures.successfulAsList(arrayList2).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
            for (int i2 = 0; i2 < list.size(); i2++) {
                TreeMap treeMap = (TreeMap) list.get(i2);
                String str2 = (String) arrayList3.get(i2);
                if (treeMap == null) {
                    try {
                        ((ListenableFuture) arrayList2.get(i2)).get();
                    } catch (Exception e) {
                        log.error(e, "Problem while getting checkpoints for task [%s], killing the task", new Object[]{str2});
                        killTask(str2);
                        taskGroup.tasks.remove(str2);
                    }
                } else if (treeMap.isEmpty()) {
                    log.warn("Ignoring task [%s], as probably it is not started running yet", new Object[]{str2});
                } else {
                    arrayList.add(new Pair(str2, treeMap));
                }
            }
            KafkaDataSourceMetadata kafkaDataSourceMetadata = (KafkaDataSourceMetadata) this.indexerMetadataStorageCoordinator.getDataSourceMetadata(this.dataSource);
            Map<Integer, Long> partitionOffsetMap = kafkaDataSourceMetadata != null && kafkaDataSourceMetadata.getKafkaPartitions() != null && this.ioConfig.getTopic().equals(kafkaDataSourceMetadata.getKafkaPartitions().getTopic()) ? kafkaDataSourceMetadata.getKafkaPartitions().getPartitionOffsetMap() : null;
            arrayList.sort((pair, pair2) -> {
                return ((Integer) ((TreeMap) pair2.rhs).firstKey()).compareTo((Integer) ((TreeMap) pair.rhs).firstKey());
            });
            HashSet hashSet = new HashSet();
            AtomicInteger atomicInteger = new AtomicInteger(-1);
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                TreeMap treeMap2 = (TreeMap) ((Pair) arrayList.get(i3)).rhs;
                String str3 = (String) ((Pair) arrayList.get(i3)).lhs;
                if (atomicInteger.get() == -1) {
                    Map<Integer, Long> map = partitionOffsetMap;
                    if (treeMap2.entrySet().stream().anyMatch(entry -> {
                        return ((Map) entry.getValue()).entrySet().stream().allMatch(entry -> {
                            return Longs.compare(((Long) entry.getValue()).longValue(), map == null ? ((Long) entry.getValue()).longValue() : ((Long) map.getOrDefault(entry.getKey(), entry.getValue())).longValue()) == 0;
                        }) && atomicInteger.compareAndSet(-1, ((Integer) entry.getKey()).intValue());
                    }) || (this.pendingCompletionTaskGroups.getOrDefault(Integer.valueOf(i), EMPTY_LIST).size() > 0 && atomicInteger.compareAndSet(-1, ((Integer) treeMap2.firstKey()).intValue()))) {
                        TreeMap treeMap3 = new TreeMap(treeMap2.tailMap(Integer.valueOf(atomicInteger.get())));
                        log.info("Setting taskGroup sequences to [%s] for group [%d]", new Object[]{treeMap3, Integer.valueOf(i)});
                        taskGroup.sequenceOffsets.clear();
                        taskGroup.sequenceOffsets.putAll(treeMap3);
                    } else {
                        log.debug("Adding task [%s] to kill list, checkpoints[%s], latestoffsets from DB [%s]", new Object[]{str3, treeMap2, partitionOffsetMap});
                        hashSet.add(str3);
                    }
                } else if (treeMap2.get(taskGroup.sequenceOffsets.firstKey()) == null || !((Map) treeMap2.get(taskGroup.sequenceOffsets.firstKey())).equals(taskGroup.sequenceOffsets.firstEntry().getValue()) || treeMap2.tailMap(taskGroup.sequenceOffsets.firstKey()).size() != taskGroup.sequenceOffsets.size()) {
                    log.debug("Adding task [%s] to kill list, checkpoints[%s], taskgroup checkpoints [%s]", new Object[]{str3, treeMap2, taskGroup.sequenceOffsets});
                    hashSet.add(str3);
                }
            }
            if ((hashSet.size() > 0 && hashSet.size() == taskGroup.tasks.size()) || (taskGroup.tasks.size() == 0 && this.pendingCompletionTaskGroups.getOrDefault(Integer.valueOf(i), EMPTY_LIST).size() == 0)) {
                log.warn("Clearing task group [%d] information as no valid tasks left the group", new Object[]{Integer.valueOf(i)});
                this.taskGroups.remove(Integer.valueOf(i));
                this.partitionGroups.get(Integer.valueOf(i)).replaceAll((num, l) -> {
                    return Long.valueOf(NOT_SET);
                });
            }
            Map<Integer, Long> map2 = partitionOffsetMap;
            arrayList.stream().filter(pair3 -> {
                return hashSet.contains(pair3.lhs);
            }).forEach(pair4 -> {
                log.warn("Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest persisted offsets in metadata store [%s]", new Object[]{pair4.lhs, pair4.rhs, taskGroup.sequenceOffsets, map2});
                killTask((String) pair4.lhs);
                taskGroup.tasks.remove(pair4.lhs);
            });
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addDiscoveredTaskToPendingCompletionTaskGroups(int i, String str, Map<Integer, Long> map) {
        CopyOnWriteArrayList<TaskGroup> computeIfAbsent = this.pendingCompletionTaskGroups.computeIfAbsent(Integer.valueOf(i), num -> {
            return new CopyOnWriteArrayList();
        });
        Iterator<TaskGroup> it = computeIfAbsent.iterator();
        while (it.hasNext()) {
            TaskGroup next = it.next();
            if (next.partitionOffsets.equals(map)) {
                if (next.tasks.putIfAbsent(str, new TaskData()) == null) {
                    log.info("Added discovered task [%s] to existing pending task group [%s]", new Object[]{str, Integer.valueOf(i)});
                    return;
                }
                return;
            }
        }
        log.info("Creating new pending completion task group [%s] for discovered task [%s]", new Object[]{Integer.valueOf(i), str});
        TaskGroup taskGroup = new TaskGroup(i, ImmutableMap.copyOf(map), com.google.common.base.Optional.absent(), com.google.common.base.Optional.absent());
        taskGroup.tasks.put(str, new TaskData());
        taskGroup.completionTimeout = DateTimes.nowUtc().plus(this.ioConfig.getCompletionTimeout());
        computeIfAbsent.add(taskGroup);
    }

    private void updateTaskStatus() throws ExecutionException, InterruptedException, TimeoutException {
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        Iterator<TaskGroup> it = this.taskGroups.values().iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, TaskData> entry : it.next().tasks.entrySet()) {
                String key = entry.getKey();
                final TaskData value = entry.getValue();
                if (value.startTime == null) {
                    newArrayList2.add(key);
                    newArrayList.add(Futures.transform(this.taskClient.getStartTimeAsync(key), new Function<DateTime, Boolean>() { // from class: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.4
                        @Nullable
                        public Boolean apply(@Nullable DateTime dateTime) {
                            if (dateTime == null) {
                                return false;
                            }
                            value.startTime = dateTime;
                            long millis = KafkaSupervisor.this.ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() - value.startTime.getMillis());
                            if (millis > 0) {
                                KafkaSupervisor.this.scheduledExec.schedule(KafkaSupervisor.this.buildRunTask(), millis + KafkaSupervisor.MAX_RUN_FREQUENCY_MILLIS, TimeUnit.MILLISECONDS);
                            }
                            return true;
                        }
                    }, this.workerExec));
                }
                value.status = (TaskStatus) this.taskStorage.getStatus(key).get();
            }
        }
        Iterator<CopyOnWriteArrayList<TaskGroup>> it2 = this.pendingCompletionTaskGroups.values().iterator();
        while (it2.hasNext()) {
            Iterator<TaskGroup> it3 = it2.next().iterator();
            while (it3.hasNext()) {
                for (Map.Entry<String, TaskData> entry2 : it3.next().tasks.entrySet()) {
                    entry2.getValue().status = (TaskStatus) this.taskStorage.getStatus(entry2.getKey()).get();
                }
            }
        }
        List list = (List) Futures.successfulAsList(newArrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        for (int i = 0; i < list.size(); i++) {
            if (list.get(i) == null) {
                String str = (String) newArrayList2.get(i);
                log.warn("Task [%s] failed to return start time, killing task", new Object[]{str});
                killTask(str);
            }
        }
    }

    private void checkTaskDuration() throws InterruptedException, ExecutionException, TimeoutException {
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        for (Map.Entry<Integer, TaskGroup> entry : this.taskGroups.entrySet()) {
            Integer key = entry.getKey();
            TaskGroup value = entry.getValue();
            DateTime nowUtc = DateTimes.nowUtc();
            for (TaskData taskData : value.tasks.values()) {
                if (taskData.startTime != null && nowUtc.isAfter(taskData.startTime)) {
                    nowUtc = taskData.startTime;
                }
            }
            if (nowUtc.plus(this.ioConfig.getTaskDuration()).isBeforeNow()) {
                log.info("Task group [%d] has run for [%s]", new Object[]{key, this.ioConfig.getTaskDuration()});
                newArrayList2.add(key);
                newArrayList.add(checkpointTaskGroup(value, true));
            }
        }
        List list = (List) Futures.successfulAsList(newArrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        for (int i = 0; i < list.size(); i++) {
            Integer num = (Integer) newArrayList2.get(i);
            TaskGroup taskGroup = this.taskGroups.get(num);
            Map map = (Map) list.get(i);
            if (map != null) {
                taskGroup.completionTimeout = DateTimes.nowUtc().plus(this.ioConfig.getCompletionTimeout());
                this.pendingCompletionTaskGroups.computeIfAbsent(num, num2 -> {
                    return new CopyOnWriteArrayList();
                }).add(taskGroup);
                for (Map.Entry entry2 : map.entrySet()) {
                    this.partitionGroups.get(num).put(entry2.getKey(), entry2.getValue());
                }
            } else {
                log.warn("All tasks in group [%s] failed to transition to publishing state, killing tasks [%s]", new Object[]{num, taskGroup.taskIds()});
                Iterator<String> it = taskGroup.taskIds().iterator();
                while (it.hasNext()) {
                    killTask(it.next());
                }
                this.partitionGroups.get(num).replaceAll((num3, l) -> {
                    return Long.valueOf(NOT_SET);
                });
            }
            this.taskGroups.remove(num);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final TaskGroup taskGroup, final boolean z) {
        if (z) {
            Iterator<Map.Entry<String, TaskData>> it = taskGroup.tasks.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<String, TaskData> next = it.next();
                String key = next.getKey();
                TaskData value = next.getValue();
                if (value.status != null) {
                    if (value.status.isSuccess()) {
                        return Futures.transform(stopTasksInGroup(taskGroup), new Function<Object, Map<Integer, Long>>() { // from class: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.5
                            @Nullable
                            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                            public Map<Integer, Long> m8apply(@Nullable Object obj) {
                                return null;
                            }
                        });
                    }
                    if (value.status.isRunnable() && this.taskInfoProvider.getTaskLocation(key).equals(TaskLocation.unknown())) {
                        log.info("Killing task [%s] which hasn't been assigned to a worker", new Object[]{key});
                        killTask(key);
                        it.remove();
                    }
                }
            }
        }
        ArrayList newArrayList = Lists.newArrayList();
        final ImmutableList copyOf = ImmutableList.copyOf(taskGroup.taskIds());
        Iterator it2 = copyOf.iterator();
        while (it2.hasNext()) {
            newArrayList.add(this.taskClient.pauseAsync((String) it2.next()));
        }
        return Futures.transform(Futures.successfulAsList(newArrayList), new Function<List<Map<Integer, Long>>, Map<Integer, Long>>() { // from class: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.6
            @Nullable
            public Map<Integer, Long> apply(List<Map<Integer, Long>> list) {
                HashMap hashMap = new HashMap();
                for (int i = 0; i < list.size(); i++) {
                    Map<Integer, Long> map = list.get(i);
                    if (map == null || map.isEmpty()) {
                        String str = (String) copyOf.get(i);
                        KafkaSupervisor.log.warn("Task [%s] failed to respond to [pause] in a timely manner, killing task", new Object[]{str});
                        KafkaSupervisor.this.killTask(str);
                        taskGroup.tasks.remove(str);
                    } else {
                        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
                            if (!hashMap.containsKey(entry.getKey()) || ((Long) hashMap.get(entry.getKey())).compareTo(entry.getValue()) < 0) {
                                hashMap.put(entry.getKey(), entry.getValue());
                            }
                        }
                    }
                }
                ArrayList newArrayList2 = Lists.newArrayList();
                ImmutableList copyOf2 = ImmutableList.copyOf(taskGroup.taskIds());
                if (copyOf2.isEmpty()) {
                    KafkaSupervisor.log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", new Object[]{Integer.valueOf(taskGroup.groupId)});
                    return null;
                }
                try {
                    if (hashMap.equals(taskGroup.sequenceOffsets.lastEntry().getValue())) {
                        KafkaSupervisor.log.warn("Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]", new Object[]{hashMap, taskGroup.sequenceOffsets.lastEntry().getValue(), Integer.valueOf(taskGroup.groupId)});
                    }
                    KafkaSupervisor.log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", new Object[]{Integer.valueOf(taskGroup.groupId), hashMap});
                    Iterator it3 = copyOf2.iterator();
                    while (it3.hasNext()) {
                        newArrayList2.add(KafkaSupervisor.this.taskClient.setEndOffsetsAsync((String) it3.next(), hashMap, z));
                    }
                    List list2 = (List) Futures.successfulAsList(newArrayList2).get(KafkaSupervisor.this.futureTimeoutInSeconds, TimeUnit.SECONDS);
                    for (int i2 = 0; i2 < list2.size(); i2++) {
                        if (list2.get(i2) == null || !((Boolean) list2.get(i2)).booleanValue()) {
                            String str2 = (String) copyOf2.get(i2);
                            KafkaSupervisor.log.warn("Task [%s] failed to respond to [set end offsets] in a timely manner, killing task", new Object[]{str2});
                            KafkaSupervisor.this.killTask(str2);
                            taskGroup.tasks.remove(str2);
                        }
                    }
                } catch (Exception e) {
                    KafkaSupervisor.log.error("Something bad happened [%s]", new Object[]{e.getMessage()});
                    Throwables.propagate(e);
                }
                if (!taskGroup.tasks.isEmpty()) {
                    return hashMap;
                }
                KafkaSupervisor.log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", new Object[]{Integer.valueOf(taskGroup.groupId)});
                return null;
            }
        }, this.workerExec);
    }

    private void checkPendingCompletionTasks() throws ExecutionException, InterruptedException, TimeoutException {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<Integer, CopyOnWriteArrayList<TaskGroup>> entry : this.pendingCompletionTaskGroups.entrySet()) {
            boolean z = false;
            Integer key = entry.getKey();
            CopyOnWriteArrayList<TaskGroup> value = entry.getValue();
            ArrayList newArrayList2 = Lists.newArrayList();
            Iterator<TaskGroup> it = value.iterator();
            while (it.hasNext()) {
                TaskGroup next = it.next();
                boolean z2 = false;
                boolean z3 = false;
                if (z) {
                    newArrayList.add(stopTasksInGroup(next));
                    newArrayList2.add(next);
                } else {
                    Iterator<Map.Entry<String, TaskData>> it2 = next.tasks.entrySet().iterator();
                    while (true) {
                        if (!it2.hasNext()) {
                            break;
                        }
                        Map.Entry<String, TaskData> next2 = it2.next();
                        String key2 = next2.getKey();
                        TaskData value2 = next2.getValue();
                        Preconditions.checkNotNull(value2.status, "WTH? task[%s] has a null status", new Object[]{key2});
                        if (value2.status.isFailure()) {
                            it2.remove();
                            if (next.tasks.isEmpty()) {
                                z3 = true;
                                break;
                            }
                        }
                        if (value2.status.isSuccess()) {
                            log.info("Task [%s] completed successfully, stopping tasks %s", new Object[]{key2, next.taskIds()});
                            newArrayList.add(stopTasksInGroup(next));
                            z2 = true;
                            newArrayList2.add(next);
                            break;
                        }
                    }
                    if ((!z2 && next.completionTimeout.isBeforeNow()) || z3) {
                        if (z3) {
                            log.warn("All tasks in group [%d] failed to publish, killing all tasks for these partitions", new Object[]{key});
                        } else {
                            log.makeAlert("No task in [%s] for taskGroup [%d] succeeded before the completion timeout elapsed [%s]!", new Object[]{next.taskIds(), key, this.ioConfig.getCompletionTimeout()}).emit();
                        }
                        this.partitionGroups.get(key).replaceAll((num, l) -> {
                            return Long.valueOf(NOT_SET);
                        });
                        killTasksInGroup(next);
                        z = true;
                        killTasksInGroup(this.taskGroups.remove(key));
                        newArrayList2.add(next);
                    }
                }
            }
            value.removeAll(newArrayList2);
        }
        Futures.successfulAsList(newArrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
    }

    private void checkCurrentTaskState() throws ExecutionException, InterruptedException, TimeoutException {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<Map.Entry<Integer, TaskGroup>> it = this.taskGroups.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, TaskGroup> next = it.next();
            Integer key = next.getKey();
            TaskGroup value = next.getValue();
            log.debug("Task group [%d] pre-pruning: %s", new Object[]{key, value.taskIds()});
            Iterator<Map.Entry<String, TaskData>> it2 = value.tasks.entrySet().iterator();
            while (true) {
                if (it2.hasNext()) {
                    Map.Entry<String, TaskData> next2 = it2.next();
                    String key2 = next2.getKey();
                    TaskData value2 = next2.getValue();
                    if (isTaskCurrent(key.intValue(), key2)) {
                        Preconditions.checkNotNull(value2.status, "WTH? task[%s] has a null status", new Object[]{key2});
                        if (value2.status.isFailure()) {
                            it2.remove();
                        } else if (value2.status.isSuccess()) {
                            newArrayList.add(stopTasksInGroup(value));
                            it.remove();
                            break;
                        }
                    } else {
                        log.info("Stopping task [%s] which does not match the expected offset range and ingestion spec", new Object[]{key2});
                        newArrayList.add(stopTask(key2, false));
                        it2.remove();
                    }
                }
            }
            log.debug("Task group [%d] post-pruning: %s", new Object[]{key, value.taskIds()});
        }
        Futures.successfulAsList(newArrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
    }

    void createNewTasks() throws JsonProcessingException {
        verifyAndMergeCheckpoints((Collection<TaskGroup>) this.taskGroups.values().stream().filter(taskGroup -> {
            return taskGroup.tasks.size() < this.ioConfig.getReplicas().intValue();
        }).collect(Collectors.toList()));
        Iterator it = this.partitionGroups.keySet().iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            if (!this.taskGroups.containsKey(num)) {
                log.info("Creating new task group [%d] for partitions %s", new Object[]{num, this.partitionGroups.get(num).keySet()});
                this.taskGroups.put(num, new TaskGroup(num.intValue(), generateStartingOffsetsForPartitionGroup(num.intValue()), this.ioConfig.getLateMessageRejectionPeriod().isPresent() ? com.google.common.base.Optional.of(DateTimes.nowUtc().minus((ReadableDuration) this.ioConfig.getLateMessageRejectionPeriod().get())) : com.google.common.base.Optional.absent(), this.ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? com.google.common.base.Optional.of(DateTimes.nowUtc().plus(this.ioConfig.getTaskDuration()).plus((ReadableDuration) this.ioConfig.getEarlyMessageRejectionPeriod().get())) : com.google.common.base.Optional.absent()));
            }
        }
        boolean z = false;
        for (Map.Entry<Integer, TaskGroup> entry : this.taskGroups.entrySet()) {
            TaskGroup value = entry.getValue();
            Integer key = entry.getKey();
            if (this.ioConfig.getReplicas().intValue() > value.tasks.size()) {
                log.info("Number of tasks [%d] does not match configured numReplicas [%d] in task group [%d], creating more tasks", new Object[]{Integer.valueOf(value.tasks.size()), this.ioConfig.getReplicas(), key});
                createKafkaTasksForGroup(key.intValue(), this.ioConfig.getReplicas().intValue() - value.tasks.size());
                z = true;
            }
        }
        if (z && this.firstRunTime.isBeforeNow()) {
            this.scheduledExec.schedule(buildRunTask(), MINIMUM_GET_OFFSET_PERIOD_MILLIS, TimeUnit.MILLISECONDS);
        }
    }

    private void createKafkaTasksForGroup(int i, int i2) throws JsonProcessingException {
        ImmutableMap<Integer, Long> immutableMap = this.taskGroups.get(Integer.valueOf(i)).partitionOffsets;
        HashMap hashMap = new HashMap();
        Iterator it = immutableMap.keySet().iterator();
        while (it.hasNext()) {
            hashMap.put((Integer) it.next(), Long.MAX_VALUE);
        }
        TaskGroup taskGroup = this.taskGroups.get(Integer.valueOf(i));
        KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(Integer.valueOf(i), taskGroup.baseSequenceName, new KafkaPartitions(this.ioConfig.getTopic(), immutableMap), new KafkaPartitions(this.ioConfig.getTopic(), hashMap), Maps.newHashMap(this.ioConfig.getConsumerProperties()), true, (DateTime) this.taskGroups.get(Integer.valueOf(i)).minimumMessageTime.orNull(), (DateTime) this.taskGroups.get(Integer.valueOf(i)).maximumMessageTime.orNull(), Boolean.valueOf(this.ioConfig.isSkipOffsetGaps()));
        String writeValueAsString = this.sortingMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<Integer, Long>>>() { // from class: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.7
        }).writeValueAsString(this.taskGroups.get(Integer.valueOf(i)).sequenceOffsets);
        ImmutableMap of = this.spec.getContext() == null ? ImmutableMap.of("checkpoints", writeValueAsString, IS_INCREMENTAL_HANDOFF_SUPPORTED, true) : ImmutableMap.builder().put("checkpoints", writeValueAsString).put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true).putAll(this.spec.getContext()).build();
        for (int i3 = 0; i3 < i2; i3++) {
            KafkaIndexTask kafkaIndexTask = new KafkaIndexTask(Joiner.on("_").join(taskGroup.baseSequenceName, RealtimeIndexTask.makeRandomId(), new Object[0]), new TaskResource(taskGroup.baseSequenceName, 1), this.spec.getDataSchema(), this.taskTuningConfig, kafkaIOConfig, of, null, null, this.rowIngestionMetersFactory, this.sortingMapper);
            com.google.common.base.Optional taskQueue = this.taskMaster.getTaskQueue();
            if (taskQueue.isPresent()) {
                try {
                    ((TaskQueue) taskQueue.get()).add(kafkaIndexTask);
                } catch (EntryExistsException e) {
                    log.error("Tried to add task [%s] but it already exists", new Object[]{kafkaIndexTask.getId()});
                }
            } else {
                log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
            }
        }
    }

    private ImmutableMap<Integer, Long> generateStartingOffsetsForPartitionGroup(int i) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Map.Entry<Integer, Long> entry : this.partitionGroups.get(Integer.valueOf(i)).entrySet()) {
            Integer key = entry.getKey();
            Long value = entry.getValue();
            if (value == null || value.longValue() == NOT_SET) {
                builder.put(key, Long.valueOf(getOffsetFromStorageForPartition(key.intValue())));
            } else {
                builder.put(key, value);
            }
        }
        return builder.build();
    }

    private long getOffsetFromStorageForPartition(int i) {
        long offsetFromKafkaForPartition;
        Map<Integer, Long> offsetsFromMetadataStorage = getOffsetsFromMetadataStorage();
        if (offsetsFromMetadataStorage.get(Integer.valueOf(i)) != null) {
            offsetFromKafkaForPartition = offsetsFromMetadataStorage.get(Integer.valueOf(i)).longValue();
            log.debug("Getting offset [%,d] from metadata storage for partition [%d]", new Object[]{Long.valueOf(offsetFromKafkaForPartition), Integer.valueOf(i)});
            long offsetFromKafkaForPartition2 = getOffsetFromKafkaForPartition(i, false);
            if (offsetFromKafkaForPartition > offsetFromKafkaForPartition2) {
                throw new ISE("Offset in metadata storage [%,d] > latest Kafka offset [%,d] for partition[%d] dataSource[%s]. If these messages are no longer available (perhaps you deleted and re-created your Kafka topic) you can use the supervisor reset API to restart ingestion.", new Object[]{Long.valueOf(offsetFromKafkaForPartition), Long.valueOf(offsetFromKafkaForPartition2), Integer.valueOf(i), this.dataSource});
            }
        } else {
            offsetFromKafkaForPartition = getOffsetFromKafkaForPartition(i, this.ioConfig.isUseEarliestOffset());
            log.debug("Getting offset [%,d] from Kafka for partition [%d]", new Object[]{Long.valueOf(offsetFromKafkaForPartition), Integer.valueOf(i)});
        }
        return offsetFromKafkaForPartition;
    }

    private Map<Integer, Long> getOffsetsFromMetadataStorage() {
        KafkaPartitions kafkaPartitions;
        DataSourceMetadata dataSourceMetadata = this.indexerMetadataStorageCoordinator.getDataSourceMetadata(this.dataSource);
        if ((dataSourceMetadata instanceof KafkaDataSourceMetadata) && (kafkaPartitions = ((KafkaDataSourceMetadata) dataSourceMetadata).getKafkaPartitions()) != null) {
            if (!this.ioConfig.getTopic().equals(kafkaPartitions.getTopic())) {
                log.warn("Topic in metadata storage [%s] doesn't match spec topic [%s], ignoring stored offsets", new Object[]{kafkaPartitions.getTopic(), this.ioConfig.getTopic()});
                return Collections.emptyMap();
            }
            if (kafkaPartitions.getPartitionOffsetMap() != null) {
                return kafkaPartitions.getPartitionOffsetMap();
            }
        }
        return Collections.emptyMap();
    }

    private long getOffsetFromKafkaForPartition(int i, boolean z) {
        long position;
        synchronized (this.consumerLock) {
            TopicPartition topicPartition = new TopicPartition(this.ioConfig.getTopic(), i);
            if (!this.consumer.assignment().contains(topicPartition)) {
                this.consumer.assign(Collections.singletonList(topicPartition));
            }
            if (z) {
                this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
            } else {
                this.consumer.seekToEnd(Collections.singletonList(topicPartition));
            }
            position = this.consumer.position(topicPartition);
        }
        return position;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isTaskCurrent(int i, String str) {
        com.google.common.base.Optional task = this.taskStorage.getTask(str);
        if (!task.isPresent() || !(task.get() instanceof KafkaIndexTask)) {
            return false;
        }
        String baseSequenceName = ((KafkaIndexTask) task.get()).getIOConfig().getBaseSequenceName();
        return this.taskGroups.get(Integer.valueOf(i)) != null ? ((TaskGroup) Preconditions.checkNotNull(this.taskGroups.get(Integer.valueOf(i)), "null taskGroup for taskId[%s]", new Object[]{Integer.valueOf(i)})).baseSequenceName.equals(baseSequenceName) : generateSequenceName(((KafkaIndexTask) task.get()).getIOConfig().getStartPartitions().getPartitionOffsetMap(), ((KafkaIndexTask) task.get()).getIOConfig().getMinimumMessageTime(), ((KafkaIndexTask) task.get()).getIOConfig().getMaximumMessageTime()).equals(baseSequenceName);
    }

    private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup) {
        if (taskGroup == null) {
            return Futures.immediateFuture((Object) null);
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
            String key = entry.getKey();
            TaskData value = entry.getValue();
            if (value.status == null) {
                killTask(key);
            } else if (!value.status.isComplete()) {
                newArrayList.add(stopTask(key, false));
            }
        }
        return Futures.successfulAsList(newArrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ListenableFuture<Void> stopTask(final String str, boolean z) {
        return Futures.transform(this.taskClient.stopAsync(str, z), new Function<Boolean, Void>() { // from class: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.8
            @Nullable
            public Void apply(@Nullable Boolean bool) {
                if (bool != null && bool.booleanValue()) {
                    return null;
                }
                KafkaSupervisor.log.info("Task [%s] failed to stop in a timely manner, killing task", new Object[]{str});
                KafkaSupervisor.this.killTask(str);
                return null;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void killTask(String str) {
        com.google.common.base.Optional taskQueue = this.taskMaster.getTaskQueue();
        if (taskQueue.isPresent()) {
            ((TaskQueue) taskQueue.get()).shutdown(str);
        } else {
            log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
        }
    }

    protected int getTaskGroupIdForPartition(int i) {
        return i % this.ioConfig.getTaskCount().intValue();
    }

    private boolean isTaskInPendingCompletionGroups(String str) {
        Iterator<CopyOnWriteArrayList<TaskGroup>> it = this.pendingCompletionTaskGroups.values().iterator();
        while (it.hasNext()) {
            Iterator<TaskGroup> it2 = it.next().iterator();
            while (it2.hasNext()) {
                if (it2.next().tasks.containsKey(str)) {
                    return true;
                }
            }
        }
        return false;
    }

    private SupervisorReport<KafkaSupervisorReportPayload> generateReport(boolean z) {
        int sum = this.partitionGroups.values().stream().mapToInt((v0) -> {
            return v0.size();
        }).sum();
        Map<Integer, Long> lagPerPartition = getLagPerPartition(getHighestCurrentOffsets());
        KafkaSupervisorReportPayload kafkaSupervisorReportPayload = new KafkaSupervisorReportPayload(this.dataSource, this.ioConfig.getTopic(), sum, this.ioConfig.getReplicas().intValue(), this.ioConfig.getTaskDuration().getMillis() / MAX_RUN_FREQUENCY_MILLIS, z ? this.latestOffsetsFromKafka : null, z ? lagPerPartition : null, z ? Long.valueOf(lagPerPartition.values().stream().mapToLong(l -> {
            return Math.max(l.longValue(), 0L);
        }).sum()) : null, z ? this.offsetsLastUpdated : null, this.spec.isSuspended());
        SupervisorReport<KafkaSupervisorReportPayload> supervisorReport = new SupervisorReport<>(this.dataSource, DateTimes.nowUtc(), kafkaSupervisorReportPayload);
        ArrayList newArrayList = Lists.newArrayList();
        try {
            for (TaskGroup taskGroup : this.taskGroups.values()) {
                for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                    String key = entry.getKey();
                    DateTime dateTime = entry.getValue().startTime;
                    Map<Integer, Long> map = entry.getValue().currentOffsets;
                    newArrayList.add(new TaskReportData(key, z ? taskGroup.partitionOffsets : null, z ? map : null, dateTime, dateTime != null ? Long.valueOf(Math.max(0L, this.ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() - dateTime.getMillis())) / MAX_RUN_FREQUENCY_MILLIS) : null, TaskReportData.TaskType.ACTIVE, z ? getLagPerPartition(map) : null));
                }
            }
            Iterator<CopyOnWriteArrayList<TaskGroup>> it = this.pendingCompletionTaskGroups.values().iterator();
            while (it.hasNext()) {
                for (TaskGroup taskGroup2 : it.next()) {
                    for (Map.Entry<String, TaskData> entry2 : taskGroup2.tasks.entrySet()) {
                        String key2 = entry2.getKey();
                        DateTime dateTime2 = entry2.getValue().startTime;
                        Map<Integer, Long> map2 = entry2.getValue().currentOffsets;
                        newArrayList.add(new TaskReportData(key2, z ? taskGroup2.partitionOffsets : null, z ? map2 : null, dateTime2, taskGroup2.completionTimeout != null ? Long.valueOf(Math.max(0L, taskGroup2.completionTimeout.getMillis() - System.currentTimeMillis()) / MAX_RUN_FREQUENCY_MILLIS) : null, TaskReportData.TaskType.PUBLISHING, null));
                    }
                }
            }
            kafkaSupervisorReportPayload.getClass();
            newArrayList.forEach(kafkaSupervisorReportPayload::addTask);
        } catch (Exception e) {
            log.warn(e, "Failed to generate status report", new Object[0]);
        }
        return supervisorReport;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Runnable buildRunTask() {
        return () -> {
            this.notices.add(new RunNotice());
        };
    }

    private void updateLatestOffsetsFromKafka() {
        synchronized (this.consumerLock) {
            Map listTopics = this.consumer.listTopics();
            if (listTopics == null || !listTopics.containsKey(this.ioConfig.getTopic())) {
                throw new ISE("Could not retrieve partitions for topic [%s]", new Object[]{this.ioConfig.getTopic()});
            }
            Set set = (Set) ((List) listTopics.get(this.ioConfig.getTopic())).stream().map(partitionInfo -> {
                return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            }).collect(Collectors.toSet());
            this.consumer.assign(set);
            this.consumer.seekToEnd(set);
            Stream stream = set.stream();
            java.util.function.Function function = (v0) -> {
                return v0.partition();
            };
            KafkaConsumer kafkaConsumer = this.consumer;
            kafkaConsumer.getClass();
            this.latestOffsetsFromKafka = (Map) stream.collect(Collectors.toMap(function, kafkaConsumer::position));
        }
    }

    private Map<Integer, Long> getHighestCurrentOffsets() {
        return (Map) this.taskGroups.values().stream().flatMap(taskGroup -> {
            return taskGroup.tasks.entrySet().stream();
        }).flatMap(entry -> {
            return ((TaskData) entry.getValue()).currentOffsets.entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (v0, v1) -> {
            return Long.max(v0, v1);
        }));
    }

    private Map<Integer, Long> getLagPerPartition(Map<Integer, Long> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf((this.latestOffsetsFromKafka == null || this.latestOffsetsFromKafka.get(entry.getKey()) == null || entry.getValue() == null) ? -2147483648L : this.latestOffsetsFromKafka.get(entry.getKey()).longValue() - ((Long) entry.getValue()).longValue());
        }));
    }

    private Runnable emitLag() {
        return () -> {
            try {
                Map<Integer, Long> highestCurrentOffsets = getHighestCurrentOffsets();
                if (this.latestOffsetsFromKafka == null) {
                    throw new ISE("Latest offsets from Kafka have not been fetched", new Object[0]);
                }
                if (!this.latestOffsetsFromKafka.keySet().equals(highestCurrentOffsets.keySet())) {
                    log.warn("Lag metric: Kafka partitions %s do not match task partitions %s", new Object[]{this.latestOffsetsFromKafka.keySet(), highestCurrentOffsets.keySet()});
                }
                this.emitter.emit(ServiceMetricEvent.builder().setDimension("dataSource", this.dataSource).build("ingest/kafka/lag", Long.valueOf(getLagPerPartition(highestCurrentOffsets).values().stream().mapToLong(l -> {
                    return Math.max(l.longValue(), 0L);
                }).sum())));
            } catch (Exception e) {
                log.warn(e, "Unable to compute Kafka lag", new Object[0]);
            }
        };
    }

    private void updateCurrentOffsets() throws InterruptedException, ExecutionException, TimeoutException {
        Futures.successfulAsList((List) Stream.concat(this.taskGroups.values().stream().flatMap(taskGroup -> {
            return taskGroup.tasks.entrySet().stream();
        }), this.pendingCompletionTaskGroups.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).flatMap(taskGroup2 -> {
            return taskGroup2.tasks.entrySet().stream();
        })).map(entry -> {
            return Futures.transform(this.taskClient.getCurrentOffsetsAsync((String) entry.getKey(), false), map -> {
                if (map == null || map.isEmpty()) {
                    return null;
                }
                ((TaskData) entry.getValue()).currentOffsets = map;
                return null;
            });
        }).collect(Collectors.toList())).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
    }

    @VisibleForTesting
    Runnable updateCurrentAndLatestOffsets() {
        return () -> {
            try {
                updateCurrentOffsets();
                updateLatestOffsetsFromKafka();
                this.offsetsLastUpdated = DateTimes.nowUtc();
            } catch (Exception e) {
                log.warn(e, "Exception while getting current/latest offsets", new Object[0]);
            }
        };
    }

    private Map<String, Map<String, Object>> getCurrentTotalStats() throws InterruptedException, ExecutionException, TimeoutException {
        HashMap newHashMap = Maps.newHashMap();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator it = this.taskGroups.keySet().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            for (String str : this.taskGroups.get(Integer.valueOf(intValue)).taskIds()) {
                arrayList.add(Futures.transform(this.taskClient.getMovingAveragesAsync(str), map -> {
                    return new StatsFromTaskResult(intValue, str, map);
                }));
                arrayList2.add(new Pair(Integer.valueOf(intValue), str));
            }
        }
        Iterator it2 = this.pendingCompletionTaskGroups.keySet().iterator();
        while (it2.hasNext()) {
            int intValue2 = ((Integer) it2.next()).intValue();
            Iterator<TaskGroup> it3 = this.pendingCompletionTaskGroups.get(Integer.valueOf(intValue2)).iterator();
            while (it3.hasNext()) {
                for (String str2 : it3.next().taskIds()) {
                    arrayList.add(Futures.transform(this.taskClient.getMovingAveragesAsync(str2), map2 -> {
                        return new StatsFromTaskResult(intValue2, str2, map2);
                    }));
                    arrayList2.add(new Pair(Integer.valueOf(intValue2), str2));
                }
            }
        }
        List list = (List) Futures.successfulAsList(arrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        for (int i = 0; i < list.size(); i++) {
            StatsFromTaskResult statsFromTaskResult = (StatsFromTaskResult) list.get(i);
            if (statsFromTaskResult != null) {
                ((Map) newHashMap.computeIfAbsent(statsFromTaskResult.getGroupId(), str3 -> {
                    return Maps.newHashMap();
                })).put(statsFromTaskResult.getTaskId(), statsFromTaskResult.getStats());
            } else {
                Pair pair = (Pair) arrayList2.get(i);
                log.error("Failed to get stats for group[%d]-task[%s]", new Object[]{pair.lhs, pair.rhs});
            }
        }
        return newHashMap;
    }

    @VisibleForTesting
    void addTaskGroupToActivelyReadingTaskGroup(int i, ImmutableMap<Integer, Long> immutableMap, com.google.common.base.Optional<DateTime> optional, com.google.common.base.Optional<DateTime> optional2, Set<String> set) {
        TaskGroup taskGroup = new TaskGroup(i, immutableMap, optional, optional2);
        taskGroup.tasks.putAll((Map) set.stream().collect(Collectors.toMap(str -> {
            return str;
        }, str2 -> {
            return new TaskData();
        })));
        if (this.taskGroups.putIfAbsent(Integer.valueOf(i), taskGroup) != null) {
            throw new ISE("trying to add taskGroup with ID [%s] to actively reading task groups, but group already exists.", new Object[]{Integer.valueOf(i)});
        }
    }

    @VisibleForTesting
    void addTaskGroupToPendingCompletionTaskGroup(int i, ImmutableMap<Integer, Long> immutableMap, com.google.common.base.Optional<DateTime> optional, com.google.common.base.Optional<DateTime> optional2, Set<String> set) {
        TaskGroup taskGroup = new TaskGroup(i, immutableMap, optional, optional2);
        taskGroup.tasks.putAll((Map) set.stream().collect(Collectors.toMap(str -> {
            return str;
        }, str2 -> {
            return new TaskData();
        })));
        this.pendingCompletionTaskGroups.computeIfAbsent(Integer.valueOf(i), num -> {
            return new CopyOnWriteArrayList();
        }).add(taskGroup);
    }

    @VisibleForTesting
    @Nullable
    TaskGroup removeTaskGroup(int i) {
        return this.taskGroups.remove(Integer.valueOf(i));
    }

    @VisibleForTesting
    void moveTaskGroupToPendingCompletion(int i) {
        TaskGroup remove = this.taskGroups.remove(Integer.valueOf(i));
        if (remove != null) {
            this.pendingCompletionTaskGroups.computeIfAbsent(Integer.valueOf(i), num -> {
                return new CopyOnWriteArrayList();
            }).add(remove);
        }
    }

    @VisibleForTesting
    int getNoticesQueueSize() {
        return this.notices.size();
    }

    @VisibleForTesting
    public boolean isStarted() {
        return this.started;
    }

    @VisibleForTesting
    public boolean isLifecycleStarted() {
        return this.lifecycleStarted;
    }

    @VisibleForTesting
    public int getInitRetryCounter() {
        return this.initRetryCounter;
    }

    @VisibleForTesting
    public KafkaSupervisorIOConfig getIoConfig() {
        return this.ioConfig;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.access$602(org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$602(org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastRunTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.access$602(org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor, long):long");
    }

    static {
    }
}
