package org.apache.flink.runtime.operators;

import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;

/* loaded from: input_file:org/apache/flink/runtime/operators/ReduceCombineDriver.class */
public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T> {
    private static final Log LOG = LogFactory.getLog(ReduceCombineDriver.class);
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    private PactTaskContext<ReduceFunction<T>, T> taskContext;
    private TypeSerializer<T> serializer;
    private TypeComparator<T> comparator;
    private ReduceFunction<T> reducer;
    private Collector<T> output;
    private MemoryManager memManager;
    private InMemorySorter<T> sorter;
    private QuickSort sortAlgo = new QuickSort();
    private boolean running;

    @Override // org.apache.flink.runtime.operators.PactDriver
    public void setup(PactTaskContext<ReduceFunction<T>, T> pactTaskContext) {
        this.taskContext = pactTaskContext;
        this.running = true;
    }

    @Override // org.apache.flink.runtime.operators.PactDriver
    public int getNumberOfInputs() {
        return 1;
    }

    @Override // org.apache.flink.runtime.operators.PactDriver
    public Class<ReduceFunction<T>> getStubType() {
        return ReduceFunction.class;
    }

    @Override // org.apache.flink.runtime.operators.PactDriver
    public boolean requiresComparatorOnInput() {
        return true;
    }

    @Override // org.apache.flink.runtime.operators.PactDriver
    public void prepare() throws Exception {
        if (this.taskContext.getTaskConfig().getDriverStrategy() != DriverStrategy.SORTED_PARTIAL_REDUCE) {
            throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for reduce combiner.");
        }
        this.memManager = this.taskContext.getMemoryManager();
        int computeNumberOfPages = this.memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
        TypeSerializerFactory<X> inputSerializer = this.taskContext.getInputSerializer(0);
        this.comparator = (TypeComparator<T>) this.taskContext.getInputComparator(0);
        this.serializer = inputSerializer.getSerializer();
        this.reducer = this.taskContext.getStub();
        this.output = this.taskContext.getOutputCollector();
        List<MemorySegment> allocatePages = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(), computeNumberOfPages);
        if (!this.comparator.supportsSerializationWithKeyNormalization() || this.serializer.getLength() <= 0 || this.serializer.getLength() > THRESHOLD_FOR_IN_PLACE_SORTING) {
            this.sorter = new NormalizedKeySorter(this.serializer, this.comparator.duplicate(), allocatePages);
        } else {
            this.sorter = new FixedLengthRecordSorter(this.serializer, this.comparator, allocatePages);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.operators.PactDriver
    public void run() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Combiner starting.");
        }
        MutableObjectIterator<X> input = this.taskContext.getInput(0);
        Object createInstance = this.serializer.createInstance();
        while (this.running) {
            Object next = input.next(createInstance);
            createInstance = next;
            if (next == null) {
                break;
            }
            if (!this.sorter.write(createInstance)) {
                sortAndCombine();
                this.sorter.reset();
                if (!this.sorter.write(createInstance)) {
                    throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
                }
            }
        }
        sortAndCombine();
    }

    private void sortAndCombine() throws Exception {
        Object obj;
        InMemorySorter<T> inMemorySorter = this.sorter;
        if (inMemorySorter.isEmpty()) {
            return;
        }
        this.sortAlgo.sort(inMemorySorter);
        TypeSerializer<T> typeSerializer = this.serializer;
        TypeComparator<T> typeComparator = this.comparator;
        ReduceFunction<T> reduceFunction = this.reducer;
        Collector<T> collector = this.output;
        MutableObjectIterator<T> iterator = inMemorySorter.getIterator();
        Object next = iterator.next(typeSerializer.createInstance());
        while (this.running && next != null) {
            typeComparator.setReference(next);
            Object obj2 = next;
            while (true) {
                obj = obj2;
                Object next2 = iterator.next(typeSerializer.createInstance());
                next = next2;
                if (next2 != null && typeComparator.equalToReference(next)) {
                    obj2 = reduceFunction.reduce(obj, next);
                }
            }
            collector.collect(obj);
        }
    }

    @Override // org.apache.flink.runtime.operators.PactDriver
    public void cleanup() {
        this.memManager.release(this.sorter.dispose());
    }

    @Override // org.apache.flink.runtime.operators.PactDriver
    public void cancel() {
        this.running = false;
        this.memManager.release(this.sorter.dispose());
    }
}
