package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.runtime.util.KeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.TraversableOnceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.class */
public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
    private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMerger.class);
    private final FlatCombineFunction<E> combineStub;
    private Configuration udfConfig;

    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger$CombineValueIterator.class */
    private static final class CombineValueIterator<E> implements Iterator<E>, Iterable<E> {
        private final InMemorySorter<E> buffer;
        private E record;
        private int last;
        private int position;
        private boolean iteratorAvailable;

        public CombineValueIterator(InMemorySorter<E> inMemorySorter, E e) {
            this.buffer = inMemorySorter;
            this.record = e;
        }

        public void set(int i, int i2) {
            this.last = i2;
            this.position = i;
            this.iteratorAvailable = true;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.position <= this.last;
        }

        @Override // java.util.Iterator
        public E next() {
            if (this.position > this.last) {
                throw new NoSuchElementException();
            }
            try {
                this.record = this.buffer.getRecord(this.record, this.position);
                this.position++;
                return this.record;
            } catch (IOException e) {
                CombiningUnilateralSortMerger.LOG.error("Error retrieving a value from a buffer.", e);
                throw new RuntimeException("Could not load the next value: " + e.getMessage(), e);
            }
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override // java.lang.Iterable
        public Iterator<E> iterator() {
            if (!this.iteratorAvailable) {
                throw new TraversableOnceException();
            }
            this.iteratorAvailable = false;
            return this;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger$CombiningSpillingThread.class */
    protected class CombiningSpillingThread extends UnilateralSortMerger<E>.SpillingThread {
        private final TypeComparator<E> comparator2;

        public CombiningSpillingThread(ExceptionHandler<IOException> exceptionHandler, UnilateralSortMerger.CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable, MemoryManager memoryManager, IOManager iOManager, TypeSerializer<E> typeSerializer, TypeComparator<E> typeComparator, List<MemorySegment> list, List<MemorySegment> list2, int i) {
            super(exceptionHandler, circularQueues, abstractInvokable, memoryManager, iOManager, typeSerializer, typeComparator, list, list2, i);
            this.comparator2 = typeComparator.duplicate();
        }

        @Override // org.apache.flink.runtime.operators.sort.UnilateralSortMerger.SpillingThread, org.apache.flink.runtime.operators.sort.UnilateralSortMerger.ThreadBase
        public void go() throws IOException {
            UnilateralSortMerger.CircularElement<E> takeNext;
            UnilateralSortMerger.CircularElement<E> take;
            ArrayDeque arrayDeque = new ArrayDeque();
            boolean z = false;
            while (true) {
                if (!isRunning()) {
                    break;
                }
                try {
                    take = this.queues.spill.take();
                } catch (InterruptedException e) {
                    if (!isRunning()) {
                        return;
                    } else {
                        CombiningUnilateralSortMerger.LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                    }
                }
                if (take == UnilateralSortMerger.spillingMarker()) {
                    break;
                }
                if (take == UnilateralSortMerger.endMarker()) {
                    z = true;
                    break;
                }
                arrayDeque.add(take);
            }
            if (isRunning()) {
                if (z) {
                    if (CombiningUnilateralSortMerger.LOG.isDebugEnabled()) {
                        CombiningUnilateralSortMerger.LOG.debug("Initiating in memory merge.");
                    }
                    ArrayList arrayList = new ArrayList(arrayDeque.size());
                    Iterator<E> it = arrayDeque.iterator();
                    while (it.hasNext()) {
                        arrayList.add(((UnilateralSortMerger.CircularElement) it.next()).buffer.getIterator());
                    }
                    if (CombiningUnilateralSortMerger.LOG.isDebugEnabled()) {
                        CombiningUnilateralSortMerger.LOG.debug("Releasing unused sort-buffer memory.");
                    }
                    disposeSortBuffers(true);
                    CombiningUnilateralSortMerger.this.setResultIterator(arrayList.isEmpty() ? EmptyMutableObjectIterator.get() : arrayList.size() == 1 ? (MutableObjectIterator) arrayList.get(0) : new MergeIterator<>(arrayList, this.serializer, this.comparator));
                    return;
                }
                FlatCombineFunction flatCombineFunction = CombiningUnilateralSortMerger.this.combineStub;
                try {
                    Configuration configuration = CombiningUnilateralSortMerger.this.udfConfig;
                    FunctionUtils.openFunction(flatCombineFunction, configuration == null ? new Configuration() : configuration);
                    FileIOChannel.Enumerator createChannelEnumerator = this.ioManager.createChannelEnumerator();
                    List<UnilateralSortMerger.ChannelWithBlockCount> arrayList2 = new ArrayList<>();
                    while (isRunning()) {
                        try {
                            takeNext = takeNext(this.queues.spill, arrayDeque);
                        } catch (InterruptedException e2) {
                            if (!isRunning()) {
                                return;
                            } else {
                                CombiningUnilateralSortMerger.LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                            }
                        }
                        if (!isRunning()) {
                            return;
                        }
                        if (takeNext == UnilateralSortMerger.endMarker()) {
                            break;
                        }
                        FileIOChannel.ID next = createChannelEnumerator.next();
                        registerChannelToBeRemovedAtShudown(next);
                        if (CombiningUnilateralSortMerger.LOG.isDebugEnabled()) {
                            CombiningUnilateralSortMerger.LOG.debug("Creating temp file " + next.toString() + '.');
                        }
                        BlockChannelWriter createBlockChannelWriter = this.ioManager.createBlockChannelWriter(next);
                        registerOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
                        ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(createBlockChannelWriter, this.writeMemory, this.memManager.getPageSize());
                        if (CombiningUnilateralSortMerger.LOG.isDebugEnabled()) {
                            CombiningUnilateralSortMerger.LOG.debug("Combining buffer " + takeNext.id + '.');
                        }
                        InMemorySorter<E> inMemorySorter = takeNext.buffer;
                        CombineValueIterator combineValueIterator = new CombineValueIterator(inMemorySorter, this.serializer.createInstance());
                        WriterCollector writerCollector = new WriterCollector(channelWriterOutputView, this.serializer);
                        int i = 0;
                        int size = inMemorySorter.size() - 1;
                        while (i < size) {
                            int i2 = i;
                            while (i < size) {
                                try {
                                    if (0 != inMemorySorter.compare(i, i + 1)) {
                                        break;
                                    } else {
                                        i++;
                                    }
                                } catch (Exception e3) {
                                    throw new IOException("An error occurred in the combiner user code.", e3);
                                }
                            }
                            if (i == i2) {
                                inMemorySorter.writeToOutput(channelWriterOutputView, i2, 1);
                            } else {
                                combineValueIterator.set(i2, i);
                                flatCombineFunction.combine(combineValueIterator, writerCollector);
                            }
                            i++;
                        }
                        if (i == size) {
                            inMemorySorter.writeToOutput(channelWriterOutputView, size, 1);
                        }
                        if (CombiningUnilateralSortMerger.LOG.isDebugEnabled()) {
                            CombiningUnilateralSortMerger.LOG.debug("Combined and spilled buffer " + takeNext.id + ".");
                        }
                        channelWriterOutputView.close();
                        unregisterOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
                        arrayList2.add(new UnilateralSortMerger.ChannelWithBlockCount(next, channelWriterOutputView.getBlockCount()));
                        takeNext.buffer.reset();
                        this.queues.empty.add(takeNext);
                    }
                    if (CombiningUnilateralSortMerger.LOG.isDebugEnabled()) {
                        CombiningUnilateralSortMerger.LOG.debug("Spilling done.");
                        CombiningUnilateralSortMerger.LOG.debug("Releasing sort-buffer memory.");
                    }
                    disposeSortBuffers(false);
                    if (CombiningUnilateralSortMerger.LOG.isDebugEnabled()) {
                        CombiningUnilateralSortMerger.LOG.debug("Closing combiner user code.");
                    }
                    try {
                        FunctionUtils.closeFunction(flatCombineFunction);
                        if (CombiningUnilateralSortMerger.LOG.isDebugEnabled()) {
                            CombiningUnilateralSortMerger.LOG.debug("User code closed.");
                        }
                        while (isRunning() && arrayList2.size() > this.maxNumFileHandles) {
                            arrayList2 = mergeChannelList(arrayList2, this.sortReadMemory, this.writeMemory);
                        }
                        this.memManager.release(this.writeMemory);
                        this.writeMemory.clear();
                        if (arrayList2.isEmpty()) {
                            CombiningUnilateralSortMerger.this.setResultIterator(EmptyMutableObjectIterator.get());
                        } else {
                            if (CombiningUnilateralSortMerger.LOG.isDebugEnabled()) {
                                CombiningUnilateralSortMerger.LOG.debug("Beginning final merge.");
                            }
                            List<List<MemorySegment>> arrayList3 = new ArrayList<>(arrayList2.size());
                            getSegmentsForReaders(arrayList3, this.sortReadMemory, arrayList2.size());
                            CombiningUnilateralSortMerger.this.setResultIterator(getMergingIterator(arrayList2, arrayList3, new ArrayList<>(arrayList2.size())));
                        }
                        if (CombiningUnilateralSortMerger.LOG.isDebugEnabled()) {
                            CombiningUnilateralSortMerger.LOG.debug("Spilling and merging thread done.");
                        }
                    } catch (Throwable th) {
                        throw new IOException("The user-defined combiner failed in its 'close()' method.", th);
                    }
                } catch (Throwable th2) {
                    throw new IOException("The user-defined combiner failed in its 'open()' method.", th2);
                }
            }
        }

        @Override // org.apache.flink.runtime.operators.sort.UnilateralSortMerger.SpillingThread
        protected UnilateralSortMerger.ChannelWithBlockCount mergeChannels(List<UnilateralSortMerger.ChannelWithBlockCount> list, List<List<MemorySegment>> list2, List<MemorySegment> list3) throws IOException {
            List<FileIOChannel> arrayList = new ArrayList<>(list.size());
            KeyGroupedIterator keyGroupedIterator = new KeyGroupedIterator(getMergingIterator(list, list2, arrayList), this.serializer, this.comparator2);
            FileIOChannel.ID createChannel = this.ioManager.createChannel();
            registerChannelToBeRemovedAtShudown(createChannel);
            BlockChannelWriter createBlockChannelWriter = this.ioManager.createBlockChannelWriter(createChannel);
            registerOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
            ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(createBlockChannelWriter, list3, this.memManager.getPageSize());
            WriterCollector writerCollector = new WriterCollector(channelWriterOutputView, this.serializer);
            FlatCombineFunction flatCombineFunction = CombiningUnilateralSortMerger.this.combineStub;
            while (keyGroupedIterator.nextKey()) {
                try {
                    flatCombineFunction.combine(keyGroupedIterator.getValues(), writerCollector);
                } catch (Exception e) {
                    throw new IOException("An error occurred in the combiner user code.");
                }
            }
            channelWriterOutputView.close();
            int blockCount = channelWriterOutputView.getBlockCount();
            unregisterOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
            for (int i = 0; i < arrayList.size(); i++) {
                FileIOChannel fileIOChannel = arrayList.get(i);
                fileIOChannel.closeAndDelete();
                unregisterOpenChannelToBeRemovedAtShudown(fileIOChannel);
            }
            return new UnilateralSortMerger.ChannelWithBlockCount(createChannel, blockCount);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger$WriterCollector.class */
    private static final class WriterCollector<E> implements Collector<E> {
        private final ChannelWriterOutputView output;
        private final TypeSerializer<E> serializer;

        private WriterCollector(ChannelWriterOutputView channelWriterOutputView, TypeSerializer<E> typeSerializer) {
            this.output = channelWriterOutputView;
            this.serializer = typeSerializer;
        }

        public void collect(E e) {
            try {
                this.serializer.serialize(e, this.output);
            } catch (IOException e2) {
                throw new RuntimeException("An error occurred forwarding the record to the writer.", e2);
            }
        }

        public void close() {
        }
    }

    public CombiningUnilateralSortMerger(FlatCombineFunction<E> flatCombineFunction, MemoryManager memoryManager, IOManager iOManager, MutableObjectIterator<E> mutableObjectIterator, AbstractInvokable abstractInvokable, TypeSerializerFactory<E> typeSerializerFactory, TypeComparator<E> typeComparator, double d, int i, float f) throws IOException, MemoryAllocationException {
        this(flatCombineFunction, memoryManager, iOManager, mutableObjectIterator, abstractInvokable, typeSerializerFactory, typeComparator, d, -1, i, f);
    }

    public CombiningUnilateralSortMerger(FlatCombineFunction<E> flatCombineFunction, MemoryManager memoryManager, IOManager iOManager, MutableObjectIterator<E> mutableObjectIterator, AbstractInvokable abstractInvokable, TypeSerializerFactory<E> typeSerializerFactory, TypeComparator<E> typeComparator, double d, int i, int i2, float f) throws IOException, MemoryAllocationException {
        super(memoryManager, iOManager, mutableObjectIterator, abstractInvokable, typeSerializerFactory, typeComparator, d, i, i2, f, false);
        this.combineStub = flatCombineFunction;
    }

    public void setUdfConfiguration(Configuration configuration) {
        this.udfConfig = configuration;
    }

    @Override // org.apache.flink.runtime.operators.sort.UnilateralSortMerger
    protected UnilateralSortMerger.ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, UnilateralSortMerger.CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable, MemoryManager memoryManager, IOManager iOManager, TypeSerializerFactory<E> typeSerializerFactory, TypeComparator<E> typeComparator, List<MemorySegment> list, List<MemorySegment> list2, int i) {
        return new CombiningSpillingThread(exceptionHandler, circularQueues, abstractInvokable, memoryManager, iOManager, typeSerializerFactory.getSerializer(), typeComparator, list, list2, i);
    }
}
