package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerHelper;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.class */
public class SinkQuerySegmentWalker implements QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class);
    private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
    private final String dataSource;
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
    private final ObjectMapper objectMapper;
    private final ServiceEmitter emitter;
    private final QueryRunnerFactoryConglomerate conglomerate;
    private final ExecutorService queryExecutorService;
    private final Cache cache;
    private final CacheConfig cacheConfig;
    private final CachePopulatorStats cachePopulatorStats;

    public SinkQuerySegmentWalker(String str, VersionedIntervalTimeline<String, Sink> versionedIntervalTimeline, ObjectMapper objectMapper, ServiceEmitter serviceEmitter, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, ExecutorService executorService, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats) {
        this.dataSource = (String) Preconditions.checkNotNull(str, "dataSource");
        this.sinkTimeline = (VersionedIntervalTimeline) Preconditions.checkNotNull(versionedIntervalTimeline, "sinkTimeline");
        this.objectMapper = (ObjectMapper) Preconditions.checkNotNull(objectMapper, "objectMapper");
        this.emitter = (ServiceEmitter) Preconditions.checkNotNull(serviceEmitter, "emitter");
        this.conglomerate = (QueryRunnerFactoryConglomerate) Preconditions.checkNotNull(queryRunnerFactoryConglomerate, "conglomerate");
        this.queryExecutorService = (ExecutorService) Preconditions.checkNotNull(executorService, "queryExecutorService");
        this.cache = (Cache) Preconditions.checkNotNull(cache, "cache");
        this.cacheConfig = (CacheConfig) Preconditions.checkNotNull(cacheConfig, "cacheConfig");
        this.cachePopulatorStats = (CachePopulatorStats) Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
        if (cache.isLocal()) {
            return;
        }
        log.warn("Configured cache[%s] is not local, caching will not be enabled.", new Object[]{cache.getClass().getName()});
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> iterable) {
        return getQueryRunnerForSegments(query, FunctionalIterable.create(iterable).transformCat(new Function<Interval, Iterable<TimelineObjectHolder<String, Sink>>>() { // from class: org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker.2
            public Iterable<TimelineObjectHolder<String, Sink>> apply(Interval interval) {
                return SinkQuerySegmentWalker.this.sinkTimeline.lookup(interval);
            }
        }).transformCat(new Function<TimelineObjectHolder<String, Sink>, Iterable<SegmentDescriptor>>() { // from class: org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker.1
            public Iterable<SegmentDescriptor> apply(final TimelineObjectHolder<String, Sink> timelineObjectHolder) {
                return FunctionalIterable.create(timelineObjectHolder.getObject()).transform(new Function<PartitionChunk<Sink>, SegmentDescriptor>() { // from class: org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker.1.1
                    public SegmentDescriptor apply(PartitionChunk<Sink> partitionChunk) {
                        return new SegmentDescriptor(timelineObjectHolder.getInterval(), (String) timelineObjectHolder.getVersion(), partitionChunk.getChunkNumber());
                    }
                });
            }
        }));
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> iterable) {
        if (!(query.getDataSource() instanceof TableDataSource) || !this.dataSource.equals(query.getDataSource().getName())) {
            log.makeAlert("Received query for unknown dataSource", new Object[0]).addData("dataSource", query.getDataSource()).emit();
            return new NoopQueryRunner();
        }
        final QueryRunnerFactory findFactory = this.conglomerate.findFactory(query);
        if (findFactory == null) {
            throw new ISE("Unknown query type[%s].", new Object[]{query.getClass()});
        }
        final QueryToolChest toolchest = findFactory.getToolchest();
        final boolean booleanValue = ((Boolean) query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false)).booleanValue();
        final AtomicLong atomicLong = new AtomicLong(0L);
        return CPUTimeMetricQueryRunner.safeBuild(toolchest.mergeResults(findFactory.mergeRunners(this.queryExecutorService, FunctionalIterable.create(iterable).transform(new Function<SegmentDescriptor, QueryRunner<T>>() { // from class: org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker.3
            public QueryRunner<T> apply(final SegmentDescriptor segmentDescriptor) {
                PartitionChunk chunk;
                PartitionHolder findEntry = SinkQuerySegmentWalker.this.sinkTimeline.findEntry(segmentDescriptor.getInterval(), segmentDescriptor.getVersion());
                if (findEntry != null && (chunk = findEntry.getChunk(segmentDescriptor.getPartitionNumber())) != null) {
                    Sink sink = (Sink) chunk.getObject();
                    SegmentId id = sink.getSegment().getId();
                    return new SpecificSegmentQueryRunner(SinkQuerySegmentWalker.this.withPerSinkMetrics(new BySegmentQueryRunner(id, segmentDescriptor.getInterval().getStart(), findFactory.mergeRunners(Execs.directExecutor(), Iterables.transform(sink, new Function<FireHydrant, QueryRunner<T>>() { // from class: org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker.3.1
                        public QueryRunner<T> apply(FireHydrant fireHydrant) {
                            boolean hasSwapped = fireHydrant.hasSwapped();
                            if (booleanValue && !hasSwapped) {
                                return new NoopQueryRunner();
                            }
                            Pair<Segment, Closeable> andIncrementSegment = fireHydrant.getAndIncrementSegment();
                            try {
                                QueryRunner<T> makeClosingQueryRunner = QueryRunnerHelper.makeClosingQueryRunner(findFactory.createRunner((Segment) andIncrementSegment.lhs), (Closeable) andIncrementSegment.rhs);
                                return (hasSwapped && SinkQuerySegmentWalker.this.cache.isLocal()) ? new CachingQueryRunner(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(fireHydrant), segmentDescriptor, SinkQuerySegmentWalker.this.objectMapper, SinkQuerySegmentWalker.this.cache, toolchest, makeClosingQueryRunner, new ForegroundCachePopulator(SinkQuerySegmentWalker.this.objectMapper, SinkQuerySegmentWalker.this.cachePopulatorStats, SinkQuerySegmentWalker.this.cacheConfig.getMaxEntrySize()), SinkQuerySegmentWalker.this.cacheConfig) : makeClosingQueryRunner;
                            } catch (RuntimeException e) {
                                CloseQuietly.close((Closeable) andIncrementSegment.rhs);
                                throw e;
                            }
                        }
                    }))), toolchest, id, atomicLong), new SpecificSegmentSpec(segmentDescriptor));
                }
                return new ReportTimelineMissingSegmentQueryRunner(segmentDescriptor);
            }
        }))), toolchest, this.emitter, atomicLong, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> QueryRunner<T> withPerSinkMetrics(QueryRunner<T> queryRunner, QueryToolChest<T, ? extends Query<T>> queryToolChest, SegmentId segmentId, AtomicLong atomicLong) {
        String segmentId2 = segmentId.toString();
        return CPUTimeMetricQueryRunner.safeBuild(new MetricsEmittingQueryRunner(this.emitter, queryToolChest, new MetricsEmittingQueryRunner(this.emitter, queryToolChest, queryRunner, (v0, v1) -> {
            v0.reportSegmentTime(v1);
        }, queryMetrics -> {
            queryMetrics.segment(segmentId2);
        }), (v0, v1) -> {
            v0.reportSegmentAndCacheTime(v1);
        }, queryMetrics2 -> {
            queryMetrics2.segment(segmentId2);
        }).withWaitMeasuredFromNow(), queryToolChest, this.emitter, atomicLong, false);
    }

    public static String makeHydrantCacheIdentifier(FireHydrant fireHydrant) {
        return fireHydrant.getSegmentId() + "_" + fireHydrant.getCount();
    }
}
