package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.StageRunner;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/sort/ExternalSorterBuilder.class */
public final class ExternalSorterBuilder<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalSorter.class);
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    private static final int MIN_NUM_WRITE_BUFFERS = 2;
    private static final int MAX_NUM_WRITE_BUFFERS = 4;
    private static final int MIN_NUM_SORT_MEM_SEGMENTS = 10;
    private final MemoryManager memoryManager;
    private final TaskInvokable parentTask;
    private final TypeSerializer<T> serializer;
    private final TypeComparator<T> comparator;
    private InMemorySorterFactory<T> inMemorySorterFactory;
    private IOManager ioManager;
    private GroupCombineFunction<T, T> combineFunction;
    private Configuration udfConfig;
    private final ExecutionConfig executionConfig;
    private int maxNumFileHandles = ((Integer) AlgorithmOptions.SPILLING_MAX_FAN.defaultValue()).intValue();
    private boolean objectReuseEnabled = false;
    private boolean handleLargeRecords = false;
    private double memoryFraction = 1.0d;
    private int numSortBuffers = -1;
    private double startSpillingFraction = ((Float) AlgorithmOptions.SORT_SPILLING_THRESHOLD.defaultValue()).floatValue();
    private boolean noSpillingMemory = true;
    private List<MemorySegment> memorySegments = null;

    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/ExternalSorterBuilder$PushFactory.class */
    private static final class PushFactory<E> implements ReadingStageFactory<E> {
        private SorterInputGateway<E> sorterInputGateway;

        private PushFactory() {
        }

        @Override // org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.ReadingStageFactory
        public StageRunner createReadingStage(ExceptionHandler<IOException> exceptionHandler, StageRunner.StageMessageDispatcher<E> stageMessageDispatcher, LargeRecordHandler<E> largeRecordHandler, long j) {
            this.sorterInputGateway = new SorterInputGateway<>(stageMessageDispatcher, largeRecordHandler, j);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/ExternalSorterBuilder$ReadingStageFactory.class */
    public interface ReadingStageFactory<E> {
        @Nullable
        StageRunner createReadingStage(ExceptionHandler<IOException> exceptionHandler, StageRunner.StageMessageDispatcher<E> stageMessageDispatcher, LargeRecordHandler<E> largeRecordHandler, long j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ExternalSorterBuilder(MemoryManager memoryManager, TaskInvokable taskInvokable, TypeSerializer<T> typeSerializer, TypeComparator<T> typeComparator, ExecutionConfig executionConfig) {
        this.memoryManager = memoryManager;
        this.parentTask = taskInvokable;
        this.executionConfig = executionConfig;
        this.serializer = typeSerializer;
        this.comparator = typeComparator;
        this.inMemorySorterFactory = new DefaultInMemorySorterFactory(typeSerializer, typeComparator, THRESHOLD_FOR_IN_PLACE_SORTING);
    }

    public ExternalSorterBuilder<T> maxNumFileHandles(int i) {
        if (i < 2) {
            throw new IllegalArgumentException("Merger cannot work with less than two file handles.");
        }
        this.maxNumFileHandles = i;
        return this;
    }

    public ExternalSorterBuilder<T> objectReuse(boolean z) {
        this.objectReuseEnabled = z;
        return this;
    }

    public ExternalSorterBuilder<T> largeRecords(boolean z) {
        this.handleLargeRecords = z;
        return this;
    }

    public ExternalSorterBuilder<T> enableSpilling(IOManager iOManager) {
        this.noSpillingMemory = false;
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        return this;
    }

    public ExternalSorterBuilder<T> enableSpilling(IOManager iOManager, double d) {
        this.startSpillingFraction = d;
        return enableSpilling(iOManager);
    }

    public ExternalSorterBuilder<T> memoryFraction(double d) {
        this.memoryFraction = d;
        return this;
    }

    public ExternalSorterBuilder<T> memory(List<MemorySegment> list) {
        this.memorySegments = (List) Preconditions.checkNotNull(list);
        return this;
    }

    public ExternalSorterBuilder<T> sortBuffers(int i) {
        this.numSortBuffers = i;
        return this;
    }

    public ExternalSorterBuilder<T> withCombiner(GroupCombineFunction<T, T> groupCombineFunction, Configuration configuration) {
        this.combineFunction = (GroupCombineFunction) Preconditions.checkNotNull(groupCombineFunction);
        this.udfConfig = (Configuration) Preconditions.checkNotNull(configuration);
        return this;
    }

    public ExternalSorterBuilder<T> withCombiner(GroupCombineFunction<T, T> groupCombineFunction) {
        this.combineFunction = (GroupCombineFunction) Preconditions.checkNotNull(groupCombineFunction);
        this.udfConfig = new Configuration();
        return this;
    }

    public ExternalSorterBuilder<T> sorterFactory(InMemorySorterFactory<T> inMemorySorterFactory) {
        this.inMemorySorterFactory = (InMemorySorterFactory) Preconditions.checkNotNull(inMemorySorterFactory);
        return this;
    }

    public ExternalSorter<T> build(MutableObjectIterator<T> mutableObjectIterator) throws MemoryAllocationException {
        return doBuild((exceptionHandler, stageMessageDispatcher, largeRecordHandler, j) -> {
            return new ReadingThread(exceptionHandler, mutableObjectIterator, stageMessageDispatcher, largeRecordHandler, this.serializer.createInstance(), j);
        });
    }

    public PushSorter<T> build() throws MemoryAllocationException {
        final PushFactory pushFactory = new PushFactory();
        final ExternalSorter<T> doBuild = doBuild(pushFactory);
        return new PushSorter<T>() { // from class: org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.1
            private final SorterInputGateway<T> recordProducer;

            {
                this.recordProducer = pushFactory.sorterInputGateway;
            }

            @Override // org.apache.flink.runtime.operators.sort.PushSorter
            public void writeRecord(T t) throws IOException, InterruptedException {
                this.recordProducer.writeRecord(t);
            }

            @Override // org.apache.flink.runtime.operators.sort.PushSorter
            public void finishReading() {
                this.recordProducer.finishReading();
            }

            @Override // org.apache.flink.runtime.operators.util.CloseableInputProvider
            public MutableObjectIterator<T> getIterator() throws InterruptedException {
                return doBuild.getIterator();
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                doBuild.close();
            }
        };
    }

    private ExternalSorter<T> doBuild(ReadingStageFactory<T> readingStageFactory) throws MemoryAllocationException {
        int max;
        int i;
        LargeRecordHandler<T> largeRecordHandler;
        List<MemorySegment> allocatePages = this.memorySegments != null ? this.memorySegments : this.memoryManager.allocatePages(this.parentTask, this.memoryManager.computeNumberOfPages(this.memoryFraction));
        int size = allocatePages.size();
        if (size < 12) {
            throw new IllegalArgumentException("Too little memory provided to sorter to perform task. Required are at least 12 pages. Current page size is " + this.memoryManager.getPageSize() + " bytes.");
        }
        if (!this.noSpillingMemory || this.handleLargeRecords) {
            int i2 = (this.noSpillingMemory ? 0 : 1) + (this.handleLargeRecords ? 2 : 0);
            if (this.maxNumFileHandles + (i2 * 2) > size) {
                max = this.noSpillingMemory ? 0 : 2;
                i = this.handleLargeRecords ? 4 : 0;
                this.maxNumFileHandles = size - (i2 * 2);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reducing maximal merge fan-in to " + this.maxNumFileHandles + " due to limited memory availability during merge");
                }
            } else {
                int i3 = size / (i2 * 100);
                if (i3 >= 4) {
                    max = this.noSpillingMemory ? 0 : 4;
                    i = this.handleLargeRecords ? 8 : 0;
                } else {
                    max = this.noSpillingMemory ? 0 : Math.max(2, i3);
                    i = this.handleLargeRecords ? 4 : 0;
                }
            }
        } else {
            max = 0;
            i = 0;
        }
        int i4 = (size - max) - i;
        long pageSize = i4 * this.memoryManager.getPageSize();
        if (this.numSortBuffers < 1) {
            if (pageSize > 104857600) {
                this.numSortBuffers = 2;
            } else {
                this.numSortBuffers = 1;
            }
        }
        int i5 = i4 / this.numSortBuffers;
        LOG.debug(String.format("Instantiating sorter with %d pages of sorting memory (=%d bytes total) divided over %d sort buffers (%d pages per buffer). Using %d buffers for writing sorted results and merging maximally %d streams at once. Using %d memory segments for large record spilling.", Integer.valueOf(i4), Long.valueOf(pageSize), Integer.valueOf(this.numSortBuffers), Integer.valueOf(i5), Integer.valueOf(max), Integer.valueOf(this.maxNumFileHandles), Integer.valueOf(i)));
        ArrayList arrayList = new ArrayList(max);
        if (max > 0) {
            for (int i6 = 0; i6 < max; i6++) {
                arrayList.add(allocatePages.remove(allocatePages.size() - 1));
            }
        }
        if (i > 0) {
            ArrayList arrayList2 = new ArrayList();
            for (int i7 = 0; i7 < i; i7++) {
                arrayList2.add(allocatePages.remove(allocatePages.size() - 1));
            }
            largeRecordHandler = new LargeRecordHandler<>(this.serializer, this.comparator.duplicate(), this.ioManager, this.memoryManager, arrayList2, this.parentTask, this.maxNumFileHandles, this.executionConfig);
        } else {
            largeRecordHandler = null;
        }
        CircularQueues circularQueues = new CircularQueues();
        ArrayList arrayList3 = new ArrayList(this.numSortBuffers);
        Iterator<MemorySegment> it = allocatePages.iterator();
        int i8 = 0;
        while (i8 < this.numSortBuffers) {
            ArrayList arrayList4 = new ArrayList(i5);
            for (int i9 = i8 == this.numSortBuffers - 1 ? Integer.MAX_VALUE : i5; i9 > 0 && it.hasNext(); i9--) {
                arrayList4.add(it.next());
            }
            InMemorySorter<T> create = this.inMemorySorterFactory.create(arrayList4);
            arrayList3.add(create);
            circularQueues.send(StageRunner.SortStage.READ, new CircularElement(i8, create, arrayList4));
            i8++;
        }
        ExceptionHandler<IOException> exceptionHandler = iOException -> {
            circularQueues.getIteratorFuture().completeExceptionally(iOException);
        };
        SpillChannelManager spillChannelManager = new SpillChannelManager();
        return new ExternalSorter<>(readingStageFactory.createReadingStage(exceptionHandler, circularQueues, largeRecordHandler, (long) (this.startSpillingFraction * pageSize)), new SortingThread(exceptionHandler, circularQueues), new SpillingThread(exceptionHandler, circularQueues, this.memoryManager, this.ioManager, this.serializer, this.comparator, allocatePages, arrayList, this.maxNumFileHandles, spillChannelManager, largeRecordHandler, this.combineFunction != null ? new CombiningSpillingBehaviour(this.combineFunction, this.serializer, this.comparator, this.objectReuseEnabled, this.udfConfig) : new DefaultSpillingBehaviour(this.objectReuseEnabled, this.serializer), 2, 4), allocatePages, arrayList, this.memoryManager, largeRecordHandler, spillChannelManager, arrayList3, circularQueues);
    }
}
