package org.apache.druid.indexing.kafka;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CircularBuffer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.DateTime;

/* loaded from: input_file:org/apache/druid/indexing/kafka/KafkaIndexTask.class */
public class KafkaIndexTask extends AbstractTask implements ChatHandler {
    private static final String TYPE = "index_kafka";
    static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
    private final DataSchema dataSchema;
    private final InputRowParser<ByteBuffer> parser;
    private final KafkaTuningConfig tuningConfig;
    private final KafkaIOConfig ioConfig;
    private final Optional<ChatHandlerProvider> chatHandlerProvider;
    private final KafkaIndexTaskRunner runner;
    private final ObjectMapper configMapper;
    private long pollRetryMs;
    private static final EmittingLogger log = new EmittingLogger(KafkaIndexTask.class);
    static final long POLL_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(100);

    /* loaded from: input_file:org/apache/druid/indexing/kafka/KafkaIndexTask$Status.class */
    public enum Status {
        NOT_STARTED,
        STARTING,
        READING,
        PAUSED,
        PUBLISHING
    }

    @JsonCreator
    public KafkaIndexTask(@JsonProperty("id") String str, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("tuningConfig") KafkaTuningConfig kafkaTuningConfig, @JsonProperty("ioConfig") KafkaIOConfig kafkaIOConfig, @JsonProperty("context") Map<String, Object> map, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, @JacksonInject ObjectMapper objectMapper) {
        super(str == null ? makeTaskId(dataSchema.getDataSource()) : str, StringUtils.format("%s_%s", new Object[]{TYPE, dataSchema.getDataSource()}), taskResource, dataSchema.getDataSource(), map);
        this.pollRetryMs = 30000L;
        this.dataSchema = (DataSchema) Preconditions.checkNotNull(dataSchema, "dataSchema");
        this.parser = (InputRowParser) Preconditions.checkNotNull(dataSchema.getParser(), "parser");
        this.tuningConfig = (KafkaTuningConfig) Preconditions.checkNotNull(kafkaTuningConfig, "tuningConfig");
        this.ioConfig = (KafkaIOConfig) Preconditions.checkNotNull(kafkaIOConfig, "ioConfig");
        this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
        this.configMapper = objectMapper;
        CircularBuffer circularBuffer = kafkaTuningConfig.getMaxSavedParseExceptions() > 0 ? new CircularBuffer(kafkaTuningConfig.getMaxSavedParseExceptions()) : null;
        if (map == null || map.get(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) == null || !((Boolean) map.get(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED)).booleanValue()) {
            this.runner = new LegacyKafkaIndexTaskRunner(this, this.parser, authorizerMapper, this.chatHandlerProvider, circularBuffer, rowIngestionMetersFactory);
        } else {
            this.runner = new IncrementalPublishingKafkaIndexTaskRunner(this, this.parser, authorizerMapper, this.chatHandlerProvider, circularBuffer, rowIngestionMetersFactory);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getPollRetryMs() {
        return this.pollRetryMs;
    }

    private static String makeTaskId(String str) {
        return Joiner.on("_").join(TYPE, str, new Object[]{RealtimeIndexTask.makeRandomId()});
    }

    public int getPriority() {
        return ((Integer) getContextValue("priority", 75)).intValue();
    }

    public String getType() {
        return TYPE;
    }

    public boolean isReady(TaskActionClient taskActionClient) {
        return true;
    }

    @JsonProperty
    public DataSchema getDataSchema() {
        return this.dataSchema;
    }

    @JsonProperty
    public KafkaTuningConfig getTuningConfig() {
        return this.tuningConfig;
    }

    @JsonProperty("ioConfig")
    public KafkaIOConfig getIOConfig() {
        return this.ioConfig;
    }

    public TaskStatus run(TaskToolbox taskToolbox) {
        return this.runner.run(taskToolbox);
    }

    public boolean canRestore() {
        return true;
    }

    public void stopGracefully() {
        this.runner.stopGracefully();
    }

    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        return this.runner.getAppenderator() == null ? new NoopQueryRunner() : (queryPlus, map) -> {
            return queryPlus.run(this.runner.getAppenderator(), map);
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Appenderator newAppenderator(FireDepartmentMetrics fireDepartmentMetrics, TaskToolbox taskToolbox) {
        return Appenderators.createRealtime(this.dataSchema, this.tuningConfig.withBasePersistDirectory(taskToolbox.getPersistDir()), fireDepartmentMetrics, taskToolbox.getSegmentPusher(), taskToolbox.getObjectMapper(), taskToolbox.getIndexIO(), taskToolbox.getIndexMergerV9(), taskToolbox.getQueryRunnerFactoryConglomerate(), taskToolbox.getSegmentAnnouncer(), taskToolbox.getEmitter(), taskToolbox.getQueryExecutorService(), taskToolbox.getCache(), taskToolbox.getCacheConfig(), taskToolbox.getCachePopulatorStats());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamAppenderatorDriver newDriver(Appenderator appenderator, TaskToolbox taskToolbox, FireDepartmentMetrics fireDepartmentMetrics) {
        return new StreamAppenderatorDriver(appenderator, new ActionBasedSegmentAllocator(taskToolbox.getTaskActionClient(), this.dataSchema, (dataSchema, inputRow, str, str2, z) -> {
            return new SegmentAllocateAction(dataSchema.getDataSource(), inputRow.getTimestamp(), dataSchema.getGranularitySpec().getQueryGranularity(), dataSchema.getGranularitySpec().getSegmentGranularity(), str, str2, z);
        }), taskToolbox.getSegmentHandoffNotifierFactory(), new ActionBasedUsedSegmentChecker(taskToolbox.getTaskActionClient()), taskToolbox.getDataSegmentKiller(), taskToolbox.getObjectMapper(), fireDepartmentMetrics);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaConsumer<byte[], byte[]> newConsumer() {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            Properties properties = new Properties();
            addConsumerPropertiesFromConfig(properties, this.configMapper, this.ioConfig.getConsumerProperties());
            properties.setProperty("enable.auto.commit", "false");
            properties.setProperty("auto.offset.reset", "none");
            properties.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
            properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaConsumer;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    public static void addConsumerPropertiesFromConfig(Properties properties, ObjectMapper objectMapper, Map<String, Object> map) {
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            String key = entry.getKey();
            if (key.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY) || key.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY) || key.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) {
                properties.setProperty(key, ((PasswordProvider) objectMapper.convertValue(entry.getValue(), PasswordProvider.class)).getPassword());
            } else {
                properties.setProperty(key, String.valueOf(entry.getValue()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void assignPartitions(KafkaConsumer kafkaConsumer, String str, Set<Integer> set) {
        kafkaConsumer.assign(Lists.newArrayList((Iterable) set.stream().map(num -> {
            return new TopicPartition(str, num.intValue());
        }).collect(Collectors.toList())));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean withinMinMaxRecordTime(InputRow inputRow) {
        boolean z = this.ioConfig.getMinimumMessageTime().isPresent() && ((DateTime) this.ioConfig.getMinimumMessageTime().get()).isAfter(inputRow.getTimestamp());
        boolean z2 = this.ioConfig.getMaximumMessageTime().isPresent() && ((DateTime) this.ioConfig.getMaximumMessageTime().get()).isBefore(inputRow.getTimestamp());
        if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
            throw new ParseException(StringUtils.format("Encountered row with timestamp that cannot be represented as a long: [%s]", new Object[]{inputRow}), new Object[0]);
        }
        if (log.isDebugEnabled()) {
            if (z) {
                log.debug("CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", new Object[]{inputRow.getTimestamp(), this.ioConfig.getMinimumMessageTime().get()});
            } else if (z2) {
                log.debug("CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", new Object[]{inputRow.getTimestamp(), this.ioConfig.getMaximumMessageTime().get()});
            }
        }
        return (z || z2) ? false : true;
    }

    @VisibleForTesting
    void setPollRetryMs(long j) {
        this.pollRetryMs = j;
    }

    @VisibleForTesting
    Appenderator getAppenderator() {
        return this.runner.getAppenderator();
    }

    @VisibleForTesting
    KafkaIndexTaskRunner getRunner() {
        return this.runner;
    }
}
