/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExceptionHandler;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.MergeIterator;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.TraversableOnceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CombiningUnilateralSortMerger<E>
extends UnilateralSortMerger<E> {
    private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMerger.class);
    private final GroupCombineFunction<E, E> combineStub;
    private Configuration udfConfig;

    public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int maxNumFileHandles, float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException {
        this(combineStub, memoryManager, ioManager, input, parentTask, serializerFactory, comparator, memoryFraction, -1, maxNumFileHandles, startSpillingFraction, handleLargeRecords, objectReuseEnabled);
    }

    public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException {
        super(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, handleLargeRecords, objectReuseEnabled);
        this.combineStub = combineStub;
    }

    public void setUdfConfiguration(Configuration config) {
        this.udfConfig = config;
    }

    @Override
    protected UnilateralSortMerger.ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, UnilateralSortMerger.CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles) {
        return new CombiningSpillingThread(exceptionHandler, queues, parentTask, memoryManager, ioManager, serializerFactory.getSerializer(), comparator, sortReadMemory, writeMemory, maxFileHandles);
    }

    private static final class WriterCollector<E>
    implements Collector<E> {
        private final ChannelWriterOutputView output;
        private final TypeSerializer<E> serializer;

        private WriterCollector(ChannelWriterOutputView output, TypeSerializer<E> serializer) {
            this.output = output;
            this.serializer = serializer;
        }

        public void collect(E record) {
            try {
                this.serializer.serialize(record, (DataOutputView)this.output);
            }
            catch (IOException ioex) {
                throw new RuntimeException("An error occurred forwarding the record to the writer.", ioex);
            }
        }

        public void close() {
        }
    }

    private static final class CombineValueIterator<E>
    implements Iterator<E>,
    Iterable<E> {
        private final InMemorySorter<E> buffer;
        private E record;
        private int last;
        private int position;
        private boolean iteratorAvailable;

        public CombineValueIterator(InMemorySorter<E> buffer, E instance) {
            this.buffer = buffer;
            this.record = instance;
        }

        public void set(int first, int last) {
            this.last = last;
            this.position = first;
            this.iteratorAvailable = true;
        }

        @Override
        public boolean hasNext() {
            return this.position <= this.last;
        }

        @Override
        public E next() {
            if (this.position <= this.last) {
                try {
                    this.record = this.buffer.getRecord(this.record, this.position);
                    ++this.position;
                    return this.record;
                }
                catch (IOException ioex) {
                    LOG.error("Error retrieving a value from a buffer.", (Throwable)ioex);
                    throw new RuntimeException("Could not load the next value: " + ioex.getMessage(), ioex);
                }
            }
            throw new NoSuchElementException();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override
        public Iterator<E> iterator() {
            if (this.iteratorAvailable) {
                this.iteratorAvailable = false;
                return this;
            }
            throw new TraversableOnceException();
        }
    }

    protected class CombiningSpillingThread
    extends UnilateralSortMerger.SpillingThread {
        private final TypeComparator<E> comparator2;

        public CombiningSpillingThread(ExceptionHandler<IOException> exceptionHandler, UnilateralSortMerger.CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memManager, IOManager ioManager, TypeSerializer<E> serializer, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxNumFileHandles) {
            super(CombiningUnilateralSortMerger.this, exceptionHandler, queues, parentTask, memManager, ioManager, serializer, comparator, sortReadMemory, writeMemory, maxNumFileHandles);
            this.comparator2 = comparator.duplicate();
        }

        /*
         * WARNING - void declaration
         */
        @Override
        public void go() throws IOException {
            void var6_12;
            UnilateralSortMerger.CircularElement element;
            ArrayDeque cache = new ArrayDeque();
            boolean cacheOnly = false;
            while (this.isRunning()) {
                try {
                    element = this.queues.spill.take();
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        continue;
                    }
                    return;
                }
                if (element == UnilateralSortMerger.spillingMarker()) break;
                if (element == UnilateralSortMerger.endMarker()) {
                    cacheOnly = true;
                    break;
                }
                cache.add(element);
            }
            if (!this.isRunning()) {
                return;
            }
            if (cacheOnly) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Initiating in memory merge.");
                }
                ArrayList iterators = new ArrayList(cache.size());
                for (UnilateralSortMerger.CircularElement circularElement : cache) {
                    iterators.add(circularElement.buffer.getIterator());
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Releasing unused sort-buffer memory.");
                }
                this.disposeSortBuffers(true);
                MergeIterator resIter = iterators.isEmpty() ? EmptyMutableObjectIterator.get() : (iterators.size() == 1 ? (MutableObjectIterator)iterators.get(0) : new MergeIterator(iterators, this.comparator));
                CombiningUnilateralSortMerger.this.setResultIterator(resIter);
                return;
            }
            GroupCombineFunction combineStub = CombiningUnilateralSortMerger.this.combineStub;
            try {
                Configuration conf = CombiningUnilateralSortMerger.this.udfConfig;
                FunctionUtils.openFunction((Function)combineStub, (Configuration)(conf == null ? new Configuration() : conf));
            }
            catch (Throwable t) {
                throw new IOException("The user-defined combiner failed in its 'open()' method.", t);
            }
            FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
            ArrayList<UnilateralSortMerger.ChannelWithBlockCount> arrayList = new ArrayList<UnilateralSortMerger.ChannelWithBlockCount>();
            while (this.isRunning()) {
                int i;
                try {
                    element = this.takeNext(this.queues.spill, cache);
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        continue;
                    }
                    return;
                }
                if (!this.isRunning()) {
                    return;
                }
                if (element == UnilateralSortMerger.endMarker()) break;
                FileIOChannel.ID channel = enumerator.next();
                this.registerChannelToBeRemovedAtShudown(channel);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Creating temp file " + channel.toString() + '.');
                }
                BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
                this.registerOpenChannelToBeRemovedAtShudown(writer);
                ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory, this.memManager.getPageSize());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Combining buffer " + element.id + '.');
                }
                InMemorySorter buffer = element.buffer;
                CombineValueIterator<Object> iter = new CombineValueIterator<Object>(buffer, this.serializer.createInstance());
                WriterCollector collector = new WriterCollector(output, this.serializer);
                int stop2 = buffer.size() - 1;
                try {
                    for (i = 0; i < stop2; ++i) {
                        int seqStart = i;
                        while (i < stop2 && 0 == buffer.compare(i, i + 1)) {
                            ++i;
                        }
                        if (i == seqStart) {
                            buffer.writeToOutput(output, seqStart, 1);
                            continue;
                        }
                        iter.set(seqStart, i);
                        combineStub.combine(iter, collector);
                    }
                }
                catch (Exception ex) {
                    throw new IOException("An error occurred in the combiner user code.", ex);
                }
                if (i == stop2) {
                    buffer.writeToOutput(output, stop2, 1);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Combined and spilled buffer " + element.id + ".");
                }
                output.close();
                this.unregisterOpenChannelToBeRemovedAtShudown(writer);
                arrayList.add(new UnilateralSortMerger.ChannelWithBlockCount(channel, output.getBlockCount()));
                element.buffer.reset();
                this.queues.empty.add(element);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Spilling done.");
                LOG.debug("Releasing sort-buffer memory.");
            }
            this.disposeSortBuffers(false);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Closing combiner user code.");
            }
            try {
                FunctionUtils.closeFunction((Function)combineStub);
            }
            catch (Throwable t) {
                throw new IOException("The user-defined combiner failed in its 'close()' method.", t);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("User code closed.");
            }
            while (this.isRunning() && var6_12.size() > this.maxFanIn) {
                List<UnilateralSortMerger.ChannelWithBlockCount> list = this.mergeChannelList((List<UnilateralSortMerger.ChannelWithBlockCount>)var6_12, this.mergeReadMemory, this.writeMemory);
            }
            this.memManager.release(this.writeMemory);
            this.writeMemory.clear();
            if (var6_12.isEmpty()) {
                CombiningUnilateralSortMerger.this.setResultIterator(EmptyMutableObjectIterator.get());
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Beginning final merge.");
                }
                ArrayList<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(var6_12.size());
                this.getSegmentsForReaders(readBuffers, this.mergeReadMemory, var6_12.size());
                MergeIterator mergeIterator = this.getMergingIterator((List<UnilateralSortMerger.ChannelWithBlockCount>)var6_12, readBuffers, new ArrayList<FileIOChannel>(var6_12.size()), null);
                CombiningUnilateralSortMerger.this.setResultIterator(mergeIterator);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Spilling and merging thread done.");
            }
        }

        @Override
        protected UnilateralSortMerger.ChannelWithBlockCount mergeChannels(List<UnilateralSortMerger.ChannelWithBlockCount> channelIDs, List<List<MemorySegment>> readBuffers, List<MemorySegment> writeBuffers) throws IOException {
            ArrayList<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(channelIDs.size());
            MergeIterator mergeIterator = this.getMergingIterator(channelIDs, readBuffers, channelAccesses, null);
            ReusingKeyGroupedIterator groupedIter = new ReusingKeyGroupedIterator(mergeIterator, this.serializer, this.comparator2);
            FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
            this.registerChannelToBeRemovedAtShudown(mergedChannelID);
            BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
            this.registerOpenChannelToBeRemovedAtShudown(writer);
            ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers, this.memManager.getPageSize());
            WriterCollector collector = new WriterCollector(output, this.serializer);
            GroupCombineFunction combineStub = CombiningUnilateralSortMerger.this.combineStub;
            try {
                while (groupedIter.nextKey()) {
                    combineStub.combine((Iterable)groupedIter.getValues(), collector);
                }
            }
            catch (Exception e) {
                throw new IOException("An error occurred in the combiner user code.");
            }
            output.close();
            int numBlocksWritten = output.getBlockCount();
            this.unregisterOpenChannelToBeRemovedAtShudown(writer);
            for (int i = 0; i < channelAccesses.size(); ++i) {
                FileIOChannel access = (FileIOChannel)channelAccesses.get(i);
                access.closeAndDelete();
                this.unregisterOpenChannelToBeRemovedAtShudown(access);
            }
            return new UnilateralSortMerger.ChannelWithBlockCount(mergedChannelID, numBlocksWritten);
        }
    }
}

