/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.PactDriver;
import org.apache.flink.runtime.operators.PactTaskContext;
import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupReduceCombineDriver<IN, OUT>
implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(GroupReduceCombineDriver.class);
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
    private InMemorySorter<IN> sorter;
    private GroupCombineFunction<IN, OUT> combiner;
    private TypeSerializer<IN> serializer;
    private TypeComparator<IN> sortingComparator;
    private TypeComparator<IN> groupingComparator;
    private QuickSort sortAlgo = new QuickSort();
    private MemoryManager memManager;
    private Collector<OUT> output;
    private volatile boolean running = true;
    private boolean objectReuseEnabled = false;

    @Override
    public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
        this.taskContext = context;
        this.running = true;
    }

    @Override
    public int getNumberOfInputs() {
        return 1;
    }

    @Override
    public Class<GroupCombineFunction<IN, OUT>> getStubType() {
        Class<GroupCombineFunction> clazz = GroupCombineFunction.class;
        return clazz;
    }

    @Override
    public int getNumberOfDriverComparators() {
        return 2;
    }

    @Override
    public void prepare() throws Exception {
        DriverStrategy driverStrategy = this.taskContext.getTaskConfig().getDriverStrategy();
        if (driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE) {
            throw new Exception("Invalid strategy " + (Object)((Object)driverStrategy) + " for " + "group reduce combinder.");
        }
        this.memManager = this.taskContext.getMemoryManager();
        int numMemoryPages = this.memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
        TypeSerializerFactory serializerFactory = this.taskContext.getInputSerializer(0);
        this.serializer = serializerFactory.getSerializer();
        this.sortingComparator = this.taskContext.getDriverComparator(0);
        this.groupingComparator = this.taskContext.getDriverComparator(1);
        this.combiner = this.taskContext.getStub();
        this.output = this.taskContext.getOutputCollector();
        List<MemorySegment> memory = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages);
        this.sorter = this.sortingComparator.supportsSerializationWithKeyNormalization() && this.serializer.getLength() > 0 && this.serializer.getLength() <= 32 ? new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory) : new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
        ExecutionConfig executionConfig = this.taskContext.getExecutionConfig();
        this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
        if (LOG.isDebugEnabled()) {
            LOG.debug("GroupReduceCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
        }
    }

    @Override
    public void run() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Combiner starting.");
        }
        MutableObjectIterator in = this.taskContext.getInput(0);
        TypeSerializer<IN> serializer = this.serializer;
        Object value = serializer.createInstance();
        while (this.running && (value = in.next(value)) != null) {
            if (this.sorter.write(value)) continue;
            this.sortAndCombine();
            this.sorter.reset();
            if (this.sorter.write(value)) continue;
            throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
        }
        this.sortAndCombine();
    }

    private void sortAndCombine() throws Exception {
        block4: {
            InMemorySorter<IN> sorter;
            block3: {
                sorter = this.sorter;
                if (!this.objectReuseEnabled) break block3;
                if (sorter.isEmpty()) break block4;
                this.sortAlgo.sort(sorter);
                ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
                GroupCombineFunction<IN, OUT> combiner = this.combiner;
                Collector<OUT> output = this.output;
                while (this.running && keyIter.nextKey()) {
                    combiner.combine((Iterable)keyIter.getValues(), output);
                }
                break block4;
            }
            if (!sorter.isEmpty()) {
                this.sortAlgo.sort(sorter);
                NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
                GroupCombineFunction<IN, OUT> combiner = this.combiner;
                Collector<OUT> output = this.output;
                while (this.running && keyIter.nextKey()) {
                    combiner.combine((Iterable)keyIter.getValues(), output);
                }
            }
        }
    }

    @Override
    public void cleanup() throws Exception {
        if (this.sorter != null) {
            this.memManager.release(this.sorter.dispose());
        }
    }

    @Override
    public void cancel() {
        this.running = false;
        if (this.sorter != null) {
            this.memManager.release(this.sorter.dispose());
        }
    }
}

