/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.indexing.common.task.AbstractFixedIntervalTask;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IOConfig;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TuningConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;

public class IndexTask
extends AbstractFixedIntervalTask {
    private static final Logger log = new Logger(IndexTask.class);
    private static HashFunction hashFunction = Hashing.murmur3_128();
    @JsonIgnore
    private final IndexIngestionSpec ingestionSchema;
    private final ObjectMapper jsonMapper;

    private static boolean shouldIndex(ShardSpec shardSpec, Interval interval, InputRow inputRow, QueryGranularity rollupGran) {
        return interval.contains(inputRow.getTimestampFromEpoch()) && shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow);
    }

    private static String makeId(String id, IndexIngestionSpec ingestionSchema) {
        if (id == null) {
            return String.format("index_%s_%s", IndexTask.makeDataSource(ingestionSchema), new DateTime().toString());
        }
        return id;
    }

    private static String makeDataSource(IndexIngestionSpec ingestionSchema) {
        return ingestionSchema.getDataSchema().getDataSource();
    }

    private static Interval makeInterval(IndexIngestionSpec ingestionSchema) {
        GranularitySpec spec = ingestionSchema.getDataSchema().getGranularitySpec();
        return new Interval((ReadableInstant)((Interval)((SortedSet)spec.bucketIntervals().get()).first()).getStart(), (ReadableInstant)((Interval)((SortedSet)spec.bucketIntervals().get()).last()).getEnd());
    }

    @JsonCreator
    public IndexTask(@JsonProperty(value="id") String id, @JsonProperty(value="spec") IndexIngestionSpec ingestionSchema, @JacksonInject ObjectMapper jsonMapper) {
        super(IndexTask.makeId(id, ingestionSchema), IndexTask.makeDataSource(ingestionSchema), IndexTask.makeInterval(ingestionSchema));
        this.ingestionSchema = ingestionSchema;
        this.jsonMapper = jsonMapper;
    }

    @Override
    public String getType() {
        return "index";
    }

    @JsonProperty(value="spec")
    public IndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @Override
    public TaskStatus run(TaskToolbox toolbox) throws Exception {
        GranularitySpec granularitySpec = this.ingestionSchema.getDataSchema().getGranularitySpec();
        int targetPartitionSize = this.ingestionSchema.getTuningConfig().getTargetPartitionSize();
        TaskLock myLock = (TaskLock)Iterables.getOnlyElement(this.getTaskLocks(toolbox));
        HashSet segments = Sets.newHashSet();
        Sets.SetView validIntervals = Sets.intersection((Set)((Set)granularitySpec.bucketIntervals().get()), this.getDataIntervals());
        if (validIntervals.isEmpty()) {
            throw new ISE("No valid data intervals found. Check your configs!", new Object[0]);
        }
        for (Interval bucket : validIntervals) {
            ArrayList shardSpecs;
            if (targetPartitionSize > 0) {
                shardSpecs = this.determinePartitions(bucket, targetPartitionSize, granularitySpec.getQueryGranularity());
            } else {
                int numShards = this.ingestionSchema.getTuningConfig().getNumShards();
                if (numShards > 0) {
                    shardSpecs = Lists.newArrayList();
                    for (int i = 0; i < numShards; ++i) {
                        shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, this.jsonMapper));
                    }
                } else {
                    shardSpecs = ImmutableList.of((Object)new NoneShardSpec());
                }
            }
            for (ShardSpec shardSpec : shardSpecs) {
                DataSegment segment = this.generateSegment(toolbox, this.ingestionSchema.getDataSchema(), shardSpec, bucket, myLock.getVersion());
                segments.add(segment);
            }
        }
        toolbox.pushSegments(segments);
        return TaskStatus.success(this.getId());
    }

    private SortedSet<Interval> getDataIntervals() throws IOException {
        FirehoseFactory firehoseFactory = this.ingestionSchema.getIOConfig().getFirehoseFactory();
        GranularitySpec granularitySpec = this.ingestionSchema.getDataSchema().getGranularitySpec();
        TreeSet retVal = Sets.newTreeSet((Comparator)Comparators.intervalsByStartThenEnd());
        try (Firehose firehose = firehoseFactory.connect(this.ingestionSchema.getDataSchema().getParser());){
            while (firehose.hasMore()) {
                InputRow inputRow = firehose.nextRow();
                Interval interval = granularitySpec.getSegmentGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
                retVal.add(interval);
            }
        }
        return retVal;
    }

    private List<ShardSpec> determinePartitions(Interval interval, int targetPartitionSize, QueryGranularity queryGranularity) throws IOException {
        log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", new Object[]{interval, targetPartitionSize});
        FirehoseFactory firehoseFactory = this.ingestionSchema.getIOConfig().getFirehoseFactory();
        HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
        try (Firehose firehose = firehoseFactory.connect(this.ingestionSchema.getDataSchema().getParser());){
            while (firehose.hasMore()) {
                InputRow inputRow = firehose.nextRow();
                if (!interval.contains(inputRow.getTimestampFromEpoch())) continue;
                List groupKey = Rows.toGroupKey((long)queryGranularity.truncate(inputRow.getTimestampFromEpoch()), (InputRow)inputRow);
                collector.add(hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes((Object)groupKey)).asBytes());
            }
        }
        double numRows = collector.estimateCardinality();
        log.info("Estimated approximately [%,f] rows of data.", new Object[]{numRows});
        int numberOfShards = (int)Math.ceil(numRows / (double)targetPartitionSize);
        if ((double)numberOfShards > numRows) {
            numberOfShards = (int)numRows;
        }
        log.info("Will require [%,d] shard(s).", new Object[]{numberOfShards});
        ArrayList shardSpecs = Lists.newArrayList();
        if (numberOfShards == 1) {
            shardSpecs.add(new NoneShardSpec());
        } else {
            for (int i = 0; i < numberOfShards; ++i) {
                shardSpecs.add(new HashBasedNumberedShardSpec(i, numberOfShards, HadoopDruidIndexerConfig.jsonMapper));
            }
        }
        return shardSpecs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DataSegment generateSegment(final TaskToolbox toolbox, DataSchema schema, ShardSpec shardSpec, Interval interval, String version) throws IOException {
        File tmpDir = new File(toolbox.getTaskWorkDir(), String.format("%s_%s_%s_%s_%s", this.getDataSource(), interval.getStart(), interval.getEnd(), version, shardSpec.getPartitionNum()));
        FirehoseFactory firehoseFactory = this.ingestionSchema.getIOConfig().getFirehoseFactory();
        int rowFlushBoundary = this.ingestionSchema.getTuningConfig().getRowFlushBoundary();
        final CopyOnWriteArrayList pushedSegments = new CopyOnWriteArrayList();
        DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher(){

            public String getPathForHadoop(String dataSource) {
                return toolbox.getSegmentPusher().getPathForHadoop(dataSource);
            }

            public DataSegment push(File file, DataSegment segment) throws IOException {
                DataSegment pushedSegment = toolbox.getSegmentPusher().push(file, segment);
                pushedSegments.add(pushedSegment);
                return pushedSegment;
            }
        };
        FireDepartmentMetrics metrics = new FireDepartmentMetrics();
        Firehose firehose = firehoseFactory.connect(this.ingestionSchema.getDataSchema().getParser());
        Plumber plumber = new YeOldePlumberSchool(interval, version, wrappedDataSegmentPusher, tmpDir).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null, null), metrics);
        int myRowFlushBoundary = rowFlushBoundary > 0 ? rowFlushBoundary : toolbox.getConfig().getDefaultRowFlushBoundary();
        QueryGranularity rollupGran = this.ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity();
        try {
            plumber.startJob();
            while (firehose.hasMore()) {
                InputRow inputRow = firehose.nextRow();
                if (IndexTask.shouldIndex(shardSpec, interval, inputRow, rollupGran)) {
                    int numRows = plumber.add(inputRow);
                    if (numRows == -1) {
                        throw new ISE(String.format("Was expecting non-null sink for timestamp[%s]", new DateTime(inputRow.getTimestampFromEpoch())), new Object[0]);
                    }
                    metrics.incrementProcessed();
                    if (numRows < myRowFlushBoundary) continue;
                    plumber.persist(firehose.commit());
                    continue;
                }
                metrics.incrementThrownAway();
            }
        }
        finally {
            firehose.close();
        }
        plumber.persist(firehose.commit());
        try {
            plumber.finishJob();
        }
        catch (Throwable throwable) {
            log.info("Task[%s] interval[%s] partition[%d] took in %,d rows (%,d processed, %,d unparseable, %,d thrown away) and output %,d rows", new Object[]{this.getId(), interval, shardSpec.getPartitionNum(), metrics.processed() + metrics.unparseable() + metrics.thrownAway(), metrics.processed(), metrics.unparseable(), metrics.thrownAway(), metrics.rowOutput()});
            throw throwable;
        }
        log.info("Task[%s] interval[%s] partition[%d] took in %,d rows (%,d processed, %,d unparseable, %,d thrown away) and output %,d rows", new Object[]{this.getId(), interval, shardSpec.getPartitionNum(), metrics.processed() + metrics.unparseable() + metrics.thrownAway(), metrics.processed(), metrics.unparseable(), metrics.thrownAway(), metrics.rowOutput()});
        return (DataSegment)Iterables.getOnlyElement(pushedSegments);
    }

    @JsonTypeName(value="index")
    public static class IndexTuningConfig
    implements TuningConfig {
        private static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000;
        private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 500000;
        private final int targetPartitionSize;
        private final int rowFlushBoundary;
        private final int numShards;

        @JsonCreator
        public IndexTuningConfig(@JsonProperty(value="targetPartitionSize") int targetPartitionSize, @JsonProperty(value="rowFlushBoundary") int rowFlushBoundary, @JsonProperty(value="numShards") @Nullable Integer numShards) {
            this.targetPartitionSize = targetPartitionSize == 0 ? 5000000 : targetPartitionSize;
            this.rowFlushBoundary = rowFlushBoundary == 0 ? 500000 : rowFlushBoundary;
            this.numShards = numShards == null ? -1 : numShards;
            Preconditions.checkArgument((this.targetPartitionSize == -1 || this.numShards == -1 ? 1 : 0) != 0, (Object)"targetPartitionsSize and shardCount both cannot be set");
        }

        @JsonProperty
        public int getTargetPartitionSize() {
            return this.targetPartitionSize;
        }

        @JsonProperty
        public int getRowFlushBoundary() {
            return this.rowFlushBoundary;
        }

        @JsonProperty
        public int getNumShards() {
            return this.numShards;
        }
    }

    @JsonTypeName(value="index")
    public static class IndexIOConfig
    implements IOConfig {
        private final FirehoseFactory firehoseFactory;

        @JsonCreator
        public IndexIOConfig(@JsonProperty(value="firehose") FirehoseFactory firehoseFactory) {
            this.firehoseFactory = firehoseFactory;
        }

        @JsonProperty(value="firehose")
        public FirehoseFactory getFirehoseFactory() {
            return this.firehoseFactory;
        }
    }

    public static class IndexIngestionSpec
    extends IngestionSpec<IndexIOConfig, IndexTuningConfig> {
        private final DataSchema dataSchema;
        private final IndexIOConfig ioConfig;
        private final IndexTuningConfig tuningConfig;

        @JsonCreator
        public IndexIngestionSpec(@JsonProperty(value="dataSchema") DataSchema dataSchema, @JsonProperty(value="ioConfig") IndexIOConfig ioConfig, @JsonProperty(value="tuningConfig") IndexTuningConfig tuningConfig) {
            super(dataSchema, (IOConfig)ioConfig, (TuningConfig)tuningConfig);
            this.dataSchema = dataSchema;
            this.ioConfig = ioConfig;
            this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null) : tuningConfig;
        }

        @JsonProperty(value="dataSchema")
        public DataSchema getDataSchema() {
            return this.dataSchema;
        }

        @JsonProperty(value="ioConfig")
        public IndexIOConfig getIOConfig() {
            return this.ioConfig;
        }

        @JsonProperty(value="tuningConfig")
        public IndexTuningConfig getTuningConfig() {
            return this.tuningConfig;
        }
    }
}

