package org.apache.druid.indexing.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.FirehoseFactoryV2;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.plumber.PlumberSchool;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CircularBuffer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.DateTime;

@Deprecated
/* loaded from: input_file:org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.class */
public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner {
    private static final EmittingLogger log = new EmittingLogger(LegacyKafkaIndexTaskRunner.class);
    private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
    private final KafkaIndexTask task;
    private final KafkaIOConfig ioConfig;
    private final KafkaTuningConfig tuningConfig;
    private final InputRowParser<ByteBuffer> parser;
    private final AuthorizerMapper authorizerMapper;
    private final Optional<ChatHandlerProvider> chatHandlerProvider;
    private final CircularBuffer<Throwable> savedParseExceptions;
    private final RowIngestionMeters rowIngestionMeters;
    private volatile DateTime startTime;
    private volatile ObjectMapper objectMapper;
    private volatile Thread runThread;
    private volatile Appenderator appenderator;
    private volatile StreamAppenderatorDriver driver;
    private volatile FireDepartmentMetrics fireDepartmentMetrics;
    private volatile IngestionState ingestionState;
    private volatile boolean pauseRequested;
    private final Map<Integer, Long> endOffsets = new ConcurrentHashMap();
    private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap();
    private final Lock pauseLock = new ReentrantLock();
    private final Condition hasPaused = this.pauseLock.newCondition();
    private final Condition shouldResume = this.pauseLock.newCondition();
    private final AtomicBoolean stopRequested = new AtomicBoolean(false);
    private final AtomicBoolean publishOnStop = new AtomicBoolean(false);
    private final Object statusLock = new Object();
    private final Lock pollRetryLock = new ReentrantLock();
    private final Condition isAwaitingRetry = this.pollRetryLock.newCondition();
    private volatile KafkaIndexTask.Status status = KafkaIndexTask.Status.NOT_STARTED;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LegacyKafkaIndexTaskRunner(KafkaIndexTask kafkaIndexTask, InputRowParser<ByteBuffer> inputRowParser, AuthorizerMapper authorizerMapper, Optional<ChatHandlerProvider> optional, CircularBuffer<Throwable> circularBuffer, RowIngestionMetersFactory rowIngestionMetersFactory) {
        this.task = kafkaIndexTask;
        this.ioConfig = kafkaIndexTask.getIOConfig();
        this.tuningConfig = kafkaIndexTask.getTuningConfig();
        this.parser = inputRowParser;
        this.authorizerMapper = authorizerMapper;
        this.chatHandlerProvider = optional;
        this.savedParseExceptions = circularBuffer;
        this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters();
        this.endOffsets.putAll(this.ioConfig.getEndPartitions().getPartitionOffsetMap());
        this.ingestionState = IngestionState.NOT_STARTED;
    }

    @Override // org.apache.druid.indexing.kafka.KafkaIndexTaskRunner
    public TaskStatus run(TaskToolbox taskToolbox) {
        try {
            return runInternal(taskToolbox);
        } catch (Exception e) {
            log.error(e, "Encountered exception while running task.", new Object[0]);
            String stackTraceAsString = Throwables.getStackTraceAsString(e);
            taskToolbox.getTaskReportFileWriter().write(getTaskCompletionReports(stackTraceAsString));
            return TaskStatus.failure(this.task.getId(), stackTraceAsString);
        }
    }

    @Override // org.apache.druid.indexing.kafka.KafkaIndexTaskRunner
    public Appenderator getAppenderator() {
        return this.appenderator;
    }

    @Override // org.apache.druid.indexing.kafka.KafkaIndexTaskRunner
    public RowIngestionMeters getRowIngestionMeters() {
        return this.rowIngestionMeters;
    }

    /* JADX WARN: Failed to calculate best type for var: r15v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r15v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r16v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r16v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r19v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r19v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r20v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r20v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 15, insn: 0x089f: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r15 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:261:0x089f */
    /* JADX WARN: Not initialized variable reg: 16, insn: 0x08a4: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r16 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:263:0x08a4 */
    /* JADX WARN: Not initialized variable reg: 19, insn: 0x07ed: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r19 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:224:0x07ed */
    /* JADX WARN: Not initialized variable reg: 20, insn: 0x07f2: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r20 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:226:0x07f2 */
    /* JADX WARN: Type inference failed for: r15v0, types: [org.apache.druid.segment.realtime.appenderator.Appenderator] */
    /* JADX WARN: Type inference failed for: r16v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r19v1, types: [org.apache.kafka.clients.consumer.KafkaConsumer] */
    /* JADX WARN: Type inference failed for: r20v0, types: [java.lang.Throwable] */
    private TaskStatus runInternal(TaskToolbox taskToolbox) throws Exception {
        ?? r15;
        ?? r16;
        Appenderator newAppenderator;
        Throwable th;
        StreamAppenderatorDriver newDriver;
        Throwable th2;
        ?? r19;
        ?? r20;
        log.info("Starting up!", new Object[0]);
        this.startTime = DateTimes.nowUtc();
        this.status = KafkaIndexTask.Status.STARTING;
        this.objectMapper = taskToolbox.getObjectMapper();
        if (this.chatHandlerProvider.isPresent()) {
            log.info("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider) this.chatHandlerProvider.get()).getClass().getName()});
            ((ChatHandlerProvider) this.chatHandlerProvider.get()).register(this.task.getId(), this, false);
        } else {
            log.warn("No chat handler detected", new Object[0]);
        }
        this.runThread = Thread.currentThread();
        FireDepartment fireDepartment = new FireDepartment(this.task.getDataSchema(), new RealtimeIOConfig((FirehoseFactory) null, (PlumberSchool) null, (FirehoseFactoryV2) null), (RealtimeTuningConfig) null);
        this.fireDepartmentMetrics = fireDepartment.getMetrics();
        taskToolbox.getMonitorScheduler().addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this.task, fireDepartment, this.rowIngestionMeters));
        String str = (String) this.task.getContextValue("lookupTier");
        LookupNodeService lookupNodeService = str == null ? taskToolbox.getLookupNodeService() : new LookupNodeService(str);
        DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(taskToolbox.getDruidNode(), "peon", ImmutableMap.of(taskToolbox.getDataNodeService().getName(), taskToolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService));
        this.ingestionState = IngestionState.BUILD_SEGMENTS;
        try {
            try {
                try {
                    newAppenderator = this.task.newAppenderator(this.fireDepartmentMetrics, taskToolbox);
                    th = null;
                    newDriver = this.task.newDriver(newAppenderator, taskToolbox, this.fireDepartmentMetrics);
                    th2 = null;
                } catch (InterruptedException | RejectedExecutionException e) {
                    if ((e instanceof RejectedExecutionException) && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) {
                        throw e;
                    }
                    if (!this.stopRequested.get()) {
                        Thread.currentThread().interrupt();
                        throw e;
                    }
                    log.info("The task was asked to stop before completing", new Object[0]);
                    if (this.chatHandlerProvider.isPresent()) {
                        ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(this.task.getId());
                    }
                    taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                    taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                }
                try {
                    try {
                        KafkaConsumer<byte[], byte[]> newConsumer = this.task.newConsumer();
                        Throwable th3 = null;
                        taskToolbox.getDataSegmentServerAnnouncer().announce();
                        taskToolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
                        this.appenderator = newAppenderator;
                        String topic = this.ioConfig.getStartPartitions().getTopic();
                        Object startJob = newDriver.startJob();
                        if (startJob == null) {
                            this.nextOffsets.putAll(this.ioConfig.getStartPartitions().getPartitionOffsetMap());
                        } else {
                            KafkaPartitions kafkaPartitions = (KafkaPartitions) taskToolbox.getObjectMapper().convertValue(((Map) startJob).get(METADATA_NEXT_PARTITIONS), KafkaPartitions.class);
                            this.nextOffsets.putAll(kafkaPartitions.getPartitionOffsetMap());
                            if (!kafkaPartitions.getTopic().equals(this.ioConfig.getStartPartitions().getTopic())) {
                                throw new ISE("WTF?! Restored topic[%s] but expected topic[%s]", new Object[]{kafkaPartitions.getTopic(), this.ioConfig.getStartPartitions().getTopic()});
                            }
                            if (!this.nextOffsets.keySet().equals(this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) {
                                throw new ISE("WTF?! Restored partitions[%s] but expected partitions[%s]", new Object[]{this.nextOffsets.keySet(), this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet()});
                            }
                        }
                        HashMap newHashMap = Maps.newHashMap();
                        for (Integer num : this.nextOffsets.keySet()) {
                            newHashMap.put(num, StringUtils.format("%s_%s", new Object[]{this.ioConfig.getBaseSequenceName(), num}));
                        }
                        Supplier<Committer> supplier = new Supplier<Committer>() { // from class: org.apache.druid.indexing.kafka.LegacyKafkaIndexTaskRunner.1
                            /* renamed from: get, reason: merged with bridge method [inline-methods] */
                            public Committer m6get() {
                                final ImmutableMap copyOf = ImmutableMap.copyOf(LegacyKafkaIndexTaskRunner.this.nextOffsets);
                                return new Committer() { // from class: org.apache.druid.indexing.kafka.LegacyKafkaIndexTaskRunner.1.1
                                    public Object getMetadata() {
                                        return ImmutableMap.of(LegacyKafkaIndexTaskRunner.METADATA_NEXT_PARTITIONS, new KafkaPartitions(LegacyKafkaIndexTaskRunner.this.ioConfig.getStartPartitions().getTopic(), copyOf));
                                    }

                                    public void run() {
                                    }
                                };
                            }
                        };
                        Set<Integer> assignPartitionsAndSeekToNext = assignPartitionsAndSeekToNext(newConsumer, topic);
                        boolean z = !assignPartitionsAndSeekToNext.isEmpty();
                        this.status = KafkaIndexTask.Status.READING;
                        loop1: while (z) {
                            try {
                                try {
                                    if (possiblyPause()) {
                                        assignPartitionsAndSeekToNext = assignPartitionsAndSeekToNext(newConsumer, topic);
                                        if (assignPartitionsAndSeekToNext.isEmpty()) {
                                            log.info("All partitions have been fully read", new Object[0]);
                                            this.publishOnStop.set(true);
                                            this.stopRequested.set(true);
                                        }
                                    }
                                    if (this.stopRequested.get()) {
                                        break;
                                    }
                                    ConsumerRecords empty = ConsumerRecords.empty();
                                    try {
                                        empty = newConsumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
                                    } catch (OffsetOutOfRangeException e2) {
                                        log.warn("OffsetOutOfRangeException with message [%s]", new Object[]{e2.getMessage()});
                                        possiblyResetOffsetsOrWait(e2.offsetOutOfRangePartitions(), newConsumer, taskToolbox);
                                        z = !assignPartitionsAndSeekToNext.isEmpty();
                                    }
                                    Iterator it = empty.iterator();
                                    while (it.hasNext()) {
                                        ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) it.next();
                                        if (log.isTraceEnabled()) {
                                            log.trace("Got topic[%s] partition[%d] offset[%,d].", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
                                        }
                                        if (consumerRecord.offset() < this.endOffsets.get(Integer.valueOf(consumerRecord.partition())).longValue()) {
                                            if (consumerRecord.offset() != this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())).longValue()) {
                                                if (!this.ioConfig.isSkipOffsetGaps()) {
                                                    throw new ISE("WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", new Object[]{Long.valueOf(consumerRecord.offset()), this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())), Integer.valueOf(consumerRecord.partition())});
                                                }
                                                log.warn("Skipped to offset[%,d] after offset[%,d] in partition[%d].", new Object[]{Long.valueOf(consumerRecord.offset()), this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())), Integer.valueOf(consumerRecord.partition())});
                                            }
                                            try {
                                                byte[] bArr = (byte[]) consumerRecord.value();
                                                List<InputRow> nullableListOf = bArr == null ? Utils.nullableListOf(new InputRow[]{(InputRow) null}) : this.parser.parseBatch(ByteBuffer.wrap(bArr));
                                                boolean z2 = false;
                                                HashMap hashMap = new HashMap();
                                                for (InputRow inputRow : nullableListOf) {
                                                    if (inputRow != null && this.task.withinMinMaxRecordTime(inputRow)) {
                                                        String str2 = (String) newHashMap.get(Integer.valueOf(consumerRecord.partition()));
                                                        AppenderatorDriverAddResult add = newDriver.add(inputRow, str2, supplier, false, false);
                                                        if (!add.isOk()) {
                                                            throw new ISE("Could not allocate segment for row with timestamp[%s]", new Object[]{inputRow.getTimestamp()});
                                                            break loop1;
                                                        }
                                                        if (add.getNumRowsInSegment() > this.tuningConfig.getMaxRowsPerSegment()) {
                                                            ((Set) hashMap.computeIfAbsent(str2, str3 -> {
                                                                return new HashSet();
                                                            })).add(add.getSegmentIdentifier());
                                                        }
                                                        z2 |= add.isPersistRequired();
                                                        if (add.getParseException() != null) {
                                                            handleParseException(add.getParseException(), consumerRecord);
                                                        } else {
                                                            this.rowIngestionMeters.incrementProcessed();
                                                        }
                                                    } else {
                                                        this.rowIngestionMeters.incrementThrownAway();
                                                    }
                                                }
                                                if (z2) {
                                                    newDriver.persist((Committer) supplier.get());
                                                }
                                                hashMap.entrySet().forEach(entry -> {
                                                    newDriver.moveSegmentOut((String) entry.getKey(), (List) ((Set) entry.getValue()).stream().collect(Collectors.toList()));
                                                });
                                            } catch (ParseException e3) {
                                                handleParseException(e3, consumerRecord);
                                            }
                                            this.nextOffsets.put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset() + 1));
                                        }
                                        if (this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())).equals(this.endOffsets.get(Integer.valueOf(consumerRecord.partition()))) && assignPartitionsAndSeekToNext.remove(Integer.valueOf(consumerRecord.partition()))) {
                                            log.info("Finished reading topic[%s], partition[%,d].", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition())});
                                            KafkaIndexTask.assignPartitions(newConsumer, topic, assignPartitionsAndSeekToNext);
                                            z = !assignPartitionsAndSeekToNext.isEmpty();
                                        }
                                    }
                                } catch (Throwable th4) {
                                    newDriver.persist((Committer) supplier.get());
                                    throw th4;
                                }
                            } catch (Exception e4) {
                                log.error(e4, "Encountered exception in runLegacy() before persisting.", new Object[0]);
                                throw e4;
                            }
                        }
                        this.ingestionState = IngestionState.COMPLETED;
                        newDriver.persist((Committer) supplier.get());
                        synchronized (this.statusLock) {
                            if (this.stopRequested.get() && !this.publishOnStop.get()) {
                                throw new InterruptedException("Stopping without publishing");
                            }
                            this.status = KafkaIndexTask.Status.PUBLISHING;
                        }
                        SegmentsAndMetadata segmentsAndMetadata = (SegmentsAndMetadata) newDriver.publish((set, obj) -> {
                            KafkaPartitions kafkaPartitions2 = (KafkaPartitions) taskToolbox.getObjectMapper().convertValue(((Map) Preconditions.checkNotNull(obj, "commitMetadata")).get(METADATA_NEXT_PARTITIONS), KafkaPartitions.class);
                            if (!this.endOffsets.equals(kafkaPartitions2.getPartitionOffsetMap())) {
                                throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", new Object[]{obj});
                            }
                            SegmentTransactionalInsertAction segmentTransactionalInsertAction = this.ioConfig.isUseTransaction() ? new SegmentTransactionalInsertAction(set, new KafkaDataSourceMetadata(this.ioConfig.getStartPartitions()), new KafkaDataSourceMetadata(kafkaPartitions2)) : new SegmentTransactionalInsertAction(set, (DataSourceMetadata) null, (DataSourceMetadata) null);
                            log.info("Publishing with isTransaction[%s].", new Object[]{Boolean.valueOf(this.ioConfig.isUseTransaction())});
                            return (SegmentPublishResult) taskToolbox.getTaskActionClient().submit(segmentTransactionalInsertAction);
                        }, (Committer) supplier.get(), newHashMap.values()).get();
                        List list = (List) segmentsAndMetadata.getSegments().stream().map((v0) -> {
                            return v0.getIdentifier();
                        }).collect(Collectors.toList());
                        log.info("Published segments[%s] with metadata[%s].", new Object[]{list, Preconditions.checkNotNull(segmentsAndMetadata.getCommitMetadata(), "commitMetadata")});
                        ListenableFuture registerHandoff = newDriver.registerHandoff(segmentsAndMetadata);
                        SegmentsAndMetadata segmentsAndMetadata2 = null;
                        if (this.tuningConfig.getHandoffConditionTimeout() == 0) {
                            segmentsAndMetadata2 = (SegmentsAndMetadata) registerHandoff.get();
                        } else {
                            try {
                                segmentsAndMetadata2 = (SegmentsAndMetadata) registerHandoff.get(this.tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
                            } catch (TimeoutException e5) {
                                log.makeAlert("Timed out after [%d] millis waiting for handoffs", new Object[]{Long.valueOf(this.tuningConfig.getHandoffConditionTimeout())}).addData("TaskId", this.task.getId()).emit();
                            }
                        }
                        if (segmentsAndMetadata2 == null) {
                            log.warn("Failed to handoff segments[%s]", new Object[]{list});
                        } else {
                            log.info("Handoff completed for segments[%s] with metadata[%s]", new Object[]{segmentsAndMetadata2.getSegments().stream().map((v0) -> {
                                return v0.getIdentifier();
                            }).collect(Collectors.toList()), Preconditions.checkNotNull(segmentsAndMetadata2.getCommitMetadata(), "commitMetadata")});
                        }
                        if (newConsumer != null) {
                            if (0 != 0) {
                                try {
                                    newConsumer.close();
                                } catch (Throwable th5) {
                                    th3.addSuppressed(th5);
                                }
                            } else {
                                newConsumer.close();
                            }
                        }
                        if (newDriver != null) {
                            if (0 != 0) {
                                try {
                                    newDriver.close();
                                } catch (Throwable th6) {
                                    th2.addSuppressed(th6);
                                }
                            } else {
                                newDriver.close();
                            }
                        }
                        if (newAppenderator != null) {
                            if (0 != 0) {
                                try {
                                    newAppenderator.close();
                                } catch (Throwable th7) {
                                    th.addSuppressed(th7);
                                }
                            } else {
                                newAppenderator.close();
                            }
                        }
                        if (this.chatHandlerProvider.isPresent()) {
                            ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(this.task.getId());
                        }
                        taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                        taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                        taskToolbox.getTaskReportFileWriter().write(getTaskCompletionReports(null));
                        return TaskStatus.success(this.task.getId(), (String) null);
                    } catch (Throwable th8) {
                        if (newDriver != null) {
                            if (0 != 0) {
                                try {
                                    newDriver.close();
                                } catch (Throwable th9) {
                                    th2.addSuppressed(th9);
                                }
                            } else {
                                newDriver.close();
                            }
                        }
                        throw th8;
                    }
                } catch (Throwable th10) {
                    if (r19 != 0) {
                        if (r20 != 0) {
                            try {
                                r19.close();
                            } catch (Throwable th11) {
                                r20.addSuppressed(th11);
                            }
                        } else {
                            r19.close();
                        }
                    }
                    throw th10;
                }
            } catch (Throwable th12) {
                if (r15 != 0) {
                    if (r16 != 0) {
                        try {
                            r15.close();
                        } catch (Throwable th13) {
                            r16.addSuppressed(th13);
                        }
                    } else {
                        r15.close();
                    }
                }
                throw th12;
            }
        } catch (Throwable th14) {
            if (this.chatHandlerProvider.isPresent()) {
                ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(this.task.getId());
            }
            taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
            taskToolbox.getDataSegmentServerAnnouncer().unannounce();
            throw th14;
        }
    }

    private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer kafkaConsumer, String str) {
        HashSet newHashSet = Sets.newHashSet();
        for (Map.Entry<Integer, Long> entry : this.nextOffsets.entrySet()) {
            long longValue = this.endOffsets.get(entry.getKey()).longValue();
            if (entry.getValue().longValue() < longValue) {
                newHashSet.add(entry.getKey());
            } else {
                if (entry.getValue().longValue() != longValue) {
                    throw new ISE("WTF?! Cannot start from offset[%,d] > endOffset[%,d]", new Object[]{entry.getValue(), Long.valueOf(longValue)});
                }
                log.info("Finished reading partition[%d].", new Object[]{entry.getKey()});
            }
        }
        KafkaIndexTask kafkaIndexTask = this.task;
        KafkaIndexTask.assignPartitions(kafkaConsumer, str, newHashSet);
        Iterator it = newHashSet.iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            long longValue2 = this.nextOffsets.get(Integer.valueOf(intValue)).longValue();
            log.info("Seeking partition[%d] to offset[%,d].", new Object[]{Integer.valueOf(intValue), Long.valueOf(longValue2)});
            kafkaConsumer.seek(new TopicPartition(str, intValue), longValue2);
        }
        return newHashSet;
    }

    private boolean possiblyPause() throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            if (!this.pauseRequested) {
                return false;
            }
            this.status = KafkaIndexTask.Status.PAUSED;
            this.hasPaused.signalAll();
            while (this.pauseRequested) {
                log.info("Pausing ingestion until resumed", new Object[0]);
                this.shouldResume.await();
            }
            this.status = KafkaIndexTask.Status.READING;
            this.shouldResume.signalAll();
            log.info("Ingestion loop resumed", new Object[0]);
            return true;
        } finally {
            this.pauseLock.unlock();
        }
    }

    private void possiblyResetOffsetsOrWait(Map<TopicPartition, Long> map, KafkaConsumer<byte[], byte[]> kafkaConsumer, TaskToolbox taskToolbox) throws InterruptedException, IOException {
        HashMap newHashMap = Maps.newHashMap();
        boolean z = false;
        if (this.tuningConfig.isResetOffsetAutomatically()) {
            for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                TopicPartition key = entry.getKey();
                long longValue = entry.getValue().longValue();
                kafkaConsumer.seekToBeginning(Collections.singletonList(key));
                long position = kafkaConsumer.position(key);
                kafkaConsumer.seek(key, longValue);
                if (position > longValue) {
                    z = true;
                    newHashMap.put(key, Long.valueOf(longValue));
                }
            }
        }
        if (z) {
            sendResetRequestAndWait(newHashMap, taskToolbox);
            return;
        }
        log.warn("Retrying in %dms", new Object[]{Long.valueOf(this.task.getPollRetryMs())});
        this.pollRetryLock.lockInterruptibly();
        try {
            long nanos = TimeUnit.MILLISECONDS.toNanos(this.task.getPollRetryMs());
            while (nanos > 0) {
                if (this.pauseRequested || this.stopRequested.get()) {
                    break;
                } else {
                    nanos = this.isAwaitingRetry.awaitNanos(nanos);
                }
            }
        } finally {
            this.pollRetryLock.unlock();
        }
    }

    private void sendResetRequestAndWait(Map<TopicPartition, Long> map, TaskToolbox taskToolbox) throws IOException {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            newHashMap.put(Integer.valueOf(entry.getKey().partition()), entry.getValue());
        }
        if (!((Boolean) taskToolbox.getTaskActionClient().submit(new ResetDataSourceMetadataAction(this.task.getDataSource(), new KafkaDataSourceMetadata(new KafkaPartitions(this.ioConfig.getStartPartitions().getTopic(), newHashMap))))).booleanValue()) {
            log.makeAlert("Failed to send reset request for partitions [%s]", new Object[]{newHashMap.keySet()}).emit();
        } else {
            log.makeAlert("Resetting Kafka offsets for datasource [%s]", new Object[]{this.task.getDataSource()}).addData("partitions", newHashMap.keySet()).emit();
            requestPause();
        }
    }

    private void requestPause() {
        this.pauseRequested = true;
    }

    private void handleParseException(ParseException parseException, ConsumerRecord<byte[], byte[]> consumerRecord) {
        if (parseException.isFromPartiallyValidRow()) {
            this.rowIngestionMeters.incrementProcessedWithError();
        } else {
            this.rowIngestionMeters.incrementUnparseable();
        }
        if (this.tuningConfig.isLogParseExceptions()) {
            log.error(parseException, "Encountered parse exception on row from partition[%d] offset[%d]", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
        }
        if (this.savedParseExceptions != null) {
            this.savedParseExceptions.add(parseException);
        }
        if (this.rowIngestionMeters.getUnparseable() + this.rowIngestionMeters.getProcessedWithError() > this.tuningConfig.getMaxParseExceptions()) {
            log.error("Max parse exceptions exceeded, terminating task...", new Object[0]);
            throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
        }
    }

    private Map<String, TaskReport> getTaskCompletionReports(@Nullable String str) {
        return TaskReport.buildTaskReports(new TaskReport[]{new IngestionStatsAndErrorsTaskReport(this.task.getId(), new IngestionStatsAndErrorsTaskReportData(this.ingestionState, getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), str))});
    }

    private Map<String, Object> getTaskCompletionUnparseableEvents() {
        HashMap newHashMap = Maps.newHashMap();
        List messagesFromSavedParseExceptions = IndexTaskUtils.getMessagesFromSavedParseExceptions(this.savedParseExceptions);
        if (messagesFromSavedParseExceptions != null) {
            newHashMap.put("buildSegments", messagesFromSavedParseExceptions);
        }
        return newHashMap;
    }

    private Map<String, Object> getTaskCompletionRowStats() {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("buildSegments", this.rowIngestionMeters.getTotals());
        return newHashMap;
    }

    @Override // org.apache.druid.indexing.kafka.KafkaIndexTaskRunner
    public void stopGracefully() {
        log.info("Stopping gracefully (status: [%s])", new Object[]{this.status});
        this.stopRequested.set(true);
        synchronized (this.statusLock) {
            if (this.status == KafkaIndexTask.Status.PUBLISHING) {
                this.runThread.interrupt();
                return;
            }
            try {
                if (!this.pauseLock.tryLock(15L, TimeUnit.SECONDS)) {
                    log.warn("While stopping: failed to acquire pauseLock before timeout, interrupting run thread", new Object[0]);
                    this.runThread.interrupt();
                    return;
                }
                try {
                    if (this.pauseRequested) {
                        this.pauseRequested = false;
                        this.shouldResume.signalAll();
                    }
                    this.pauseLock.unlock();
                    if (this.pollRetryLock.tryLock(15L, TimeUnit.SECONDS)) {
                        try {
                            this.isAwaitingRetry.signalAll();
                            this.pollRetryLock.unlock();
                        } catch (Throwable th) {
                            this.pollRetryLock.unlock();
                            throw th;
                        }
                    } else {
                        log.warn("While stopping: failed to acquire pollRetryLock before timeout, interrupting run thread", new Object[0]);
                        this.runThread.interrupt();
                    }
                } catch (Throwable th2) {
                    this.pauseLock.unlock();
                    throw th2;
                }
            } catch (Exception e) {
                Throwables.propagate(e);
            }
        }
    }

    private Access authorizationCheck(HttpServletRequest httpServletRequest, Action action) {
        return IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, action, this.task.getDataSource(), this.authorizerMapper);
    }

    @POST
    @Path("/stop")
    public Response stop(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.WRITE);
        stopGracefully();
        return Response.status(Response.Status.OK).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/status")
    public KafkaIndexTask.Status getStatusHTTP(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return this.status;
    }

    @Override // org.apache.druid.indexing.kafka.KafkaIndexTaskRunner
    public KafkaIndexTask.Status getStatus() {
        return this.status;
    }

    @GET
    @Produces({"application/json"})
    @Path("/offsets/current")
    public Map<Integer, Long> getCurrentOffsets(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return getCurrentOffsets();
    }

    @Override // org.apache.druid.indexing.kafka.KafkaIndexTaskRunner
    public Map<Integer, Long> getCurrentOffsets() {
        return this.nextOffsets;
    }

    @GET
    @Produces({"application/json"})
    @Path("/offsets/end")
    public Map<Integer, Long> getEndOffsetsHTTP(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return getEndOffsets();
    }

    @Override // org.apache.druid.indexing.kafka.KafkaIndexTaskRunner
    public Map<Integer, Long> getEndOffsets() {
        return this.endOffsets;
    }

    @Override // org.apache.druid.indexing.kafka.KafkaIndexTaskRunner
    public Response setEndOffsets(Map<Integer, Long> map, boolean z) throws InterruptedException {
        return setEndOffsets(map);
    }

    @Path("/offsets/end")
    @Consumes({"application/json"})
    @POST
    @Produces({"application/json"})
    public Response setEndOffsetsHTTP(Map<Integer, Long> map, @Context HttpServletRequest httpServletRequest) throws InterruptedException {
        authorizationCheck(httpServletRequest, Action.WRITE);
        return setEndOffsets(map);
    }

    @GET
    @Produces({"application/json"})
    @Path("/rowStats")
    public Response getRowStats(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        HashMap newHashMap3 = Maps.newHashMap();
        newHashMap2.put("buildSegments", this.rowIngestionMeters.getTotals());
        newHashMap3.put("buildSegments", this.rowIngestionMeters.getMovingAverages());
        newHashMap.put("movingAverages", newHashMap3);
        newHashMap.put("totals", newHashMap2);
        return Response.ok(newHashMap).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/unparseableEvents")
    public Response getUnparseableEvents(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return Response.ok(IndexTaskUtils.getMessagesFromSavedParseExceptions(this.savedParseExceptions)).build();
    }

    public Response setEndOffsets(Map<Integer, Long> map) throws InterruptedException {
        if (map == null) {
            return Response.status(Response.Status.BAD_REQUEST).entity("Request body must contain a map of { partition:endOffset }").build();
        }
        if (!this.endOffsets.keySet().containsAll(map.keySet())) {
            return Response.status(Response.Status.BAD_REQUEST).entity(StringUtils.format("Request contains partitions not being handled by this task, my partitions: %s", new Object[]{this.endOffsets.keySet()})).build();
        }
        this.pauseLock.lockInterruptibly();
        try {
            if (!isPaused()) {
                Response build = Response.status(Response.Status.BAD_REQUEST).entity("Task must be paused before changing the end offsets").build();
                this.pauseLock.unlock();
                return build;
            }
            for (Map.Entry<Integer, Long> entry : map.entrySet()) {
                if (entry.getValue().compareTo(this.nextOffsets.get(entry.getKey())) < 0) {
                    Response build2 = Response.status(Response.Status.BAD_REQUEST).entity(StringUtils.format("End offset must be >= current offset for partition [%s] (current: %s)", new Object[]{entry.getKey(), this.nextOffsets.get(entry.getKey())})).build();
                    this.pauseLock.unlock();
                    return build2;
                }
            }
            this.endOffsets.putAll(map);
            log.info("endOffsets changed to %s", new Object[]{this.endOffsets});
            this.pauseLock.unlock();
            resume();
            return Response.ok(this.endOffsets).build();
        } catch (Throwable th) {
            this.pauseLock.unlock();
            throw th;
        }
    }

    private boolean isPaused() {
        return this.status == KafkaIndexTask.Status.PAUSED;
    }

    @POST
    @Produces({"application/json"})
    @Path("/pause")
    public Response pauseHTTP(@Context HttpServletRequest httpServletRequest) throws InterruptedException {
        authorizationCheck(httpServletRequest, Action.WRITE);
        return pause();
    }

    @Override // org.apache.druid.indexing.kafka.KafkaIndexTaskRunner
    public Response pause() throws InterruptedException {
        if (this.status != KafkaIndexTask.Status.PAUSED && this.status != KafkaIndexTask.Status.READING) {
            return Response.status(Response.Status.BAD_REQUEST).entity(StringUtils.format("Can't pause, task is not in a pausable state (state: [%s])", new Object[]{this.status})).build();
        }
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseRequested = true;
            this.pollRetryLock.lockInterruptibly();
            try {
                this.isAwaitingRetry.signalAll();
                this.pollRetryLock.unlock();
                if (isPaused()) {
                    this.shouldResume.signalAll();
                }
                long nanos = TimeUnit.SECONDS.toNanos(2L);
                while (!isPaused()) {
                    if (nanos <= 0) {
                        Response build = Response.status(Response.Status.ACCEPTED).entity("Request accepted but task has not yet paused").build();
                        this.pauseLock.unlock();
                        return build;
                    }
                    nanos = this.hasPaused.awaitNanos(nanos);
                }
                try {
                    return Response.ok().entity(this.objectMapper.writeValueAsString(getCurrentOffsets())).build();
                } catch (JsonProcessingException e) {
                    throw Throwables.propagate(e);
                }
            } catch (Throwable th) {
                this.pollRetryLock.unlock();
                throw th;
            }
        } finally {
            this.pauseLock.unlock();
        }
    }

    @POST
    @Path("/resume")
    public Response resumeHTTP(@Context HttpServletRequest httpServletRequest) throws InterruptedException {
        authorizationCheck(httpServletRequest, Action.WRITE);
        resume();
        return Response.status(Response.Status.OK).build();
    }

    @Override // org.apache.druid.indexing.kafka.KafkaIndexTaskRunner
    public void resume() throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseRequested = false;
            this.shouldResume.signalAll();
            long nanos = TimeUnit.SECONDS.toNanos(5L);
            while (isPaused()) {
                if (nanos <= 0) {
                    throw new RuntimeException("Resume command was not accepted within 5 seconds");
                }
                nanos = this.shouldResume.awaitNanos(nanos);
            }
        } finally {
            this.pauseLock.unlock();
        }
    }

    @GET
    @Produces({"application/json"})
    @Path("/time/start")
    public DateTime getStartTime(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.WRITE);
        return this.startTime;
    }
}
