package org.apache.flink.runtime.operators.chaining;

import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.util.KeyGroupedIterator;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.class */
public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    private InMemorySorter<T> sorter;
    private FlatCombineFunction<T> combiner;
    private TypeSerializer<T> serializer;
    private TypeComparator<T> comparator;
    private AbstractInvokable parent;
    private MemoryManager memManager;
    private QuickSort sortAlgo = new QuickSort();
    private volatile boolean running = true;

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void setup(AbstractInvokable abstractInvokable) {
        this.parent = abstractInvokable;
        FlatCombineFunction<T> flatCombineFunction = (FlatCombineFunction) RegularPactTask.instantiateUserCode(this.config, this.userCodeClassLoader, FlatCombineFunction.class);
        this.combiner = flatCombineFunction;
        FunctionUtils.setFunctionRuntimeContext(flatCombineFunction, getUdfRuntimeContext());
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void openTask() throws Exception {
        RegularPactTask.openUserCode(this.combiner, this.config.getStubParameters());
        this.memManager = this.parent.getEnvironment().getMemoryManager();
        int computeNumberOfPages = this.memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
        TypeSerializerFactory<T> inputSerializer = this.config.getInputSerializer(0, this.userCodeClassLoader);
        TypeComparatorFactory<T> driverComparator = this.config.getDriverComparator(0, this.userCodeClassLoader);
        this.serializer = inputSerializer.getSerializer();
        this.comparator = driverComparator.createComparator();
        List<MemorySegment> allocatePages = this.memManager.allocatePages(this.parent, computeNumberOfPages);
        if (!this.comparator.supportsSerializationWithKeyNormalization() || this.serializer.getLength() <= 0 || this.serializer.getLength() > THRESHOLD_FOR_IN_PLACE_SORTING) {
            this.sorter = new NormalizedKeySorter(this.serializer, this.comparator.duplicate(), allocatePages);
        } else {
            this.sorter = new FixedLengthRecordSorter(this.serializer, this.comparator, allocatePages);
        }
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void closeTask() throws Exception {
        this.memManager.release(this.sorter.dispose());
        if (this.running) {
            RegularPactTask.closeUserCode(this.combiner);
        }
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void cancelTask() {
        this.running = false;
        this.memManager.release(this.sorter.dispose());
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    /* renamed from: getStub */
    public Function mo110getStub() {
        return this.combiner;
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public String getTaskName() {
        return this.taskName;
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void collect(T t) {
        try {
            if (this.sorter.write(t)) {
                return;
            }
            try {
                sortAndCombine();
                this.sorter.reset();
                try {
                    if (this.sorter.write(t)) {
                    } else {
                        throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
                    }
                } catch (IOException e) {
                    throw new ExceptionInChainedStubException(this.taskName, e);
                }
            } catch (Exception e2) {
                throw new ExceptionInChainedStubException(this.taskName, e2);
            }
        } catch (IOException e3) {
            throw new ExceptionInChainedStubException(this.taskName, e3);
        }
    }

    public void close() {
        try {
            sortAndCombine();
            this.outputCollector.close();
        } catch (Exception e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
    }

    private void sortAndCombine() throws Exception {
        InMemorySorter<T> inMemorySorter = this.sorter;
        if (inMemorySorter.isEmpty()) {
            return;
        }
        this.sortAlgo.sort(inMemorySorter);
        KeyGroupedIterator keyGroupedIterator = new KeyGroupedIterator(inMemorySorter.getIterator(), this.serializer, this.comparator);
        FlatCombineFunction<T> flatCombineFunction = this.combiner;
        Collector<OT> collector = this.outputCollector;
        while (this.running && keyGroupedIterator.nextKey()) {
            flatCombineFunction.combine(keyGroupedIterator.getValues(), collector);
        }
    }
}
