package org.apache.flink.runtime.operators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.AccumulatorEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.BufferWriter;
import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
import org.apache.flink.runtime.operators.shipping.OutputCollector;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.RecordOutputCollector;
import org.apache.flink.runtime.operators.shipping.RecordOutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.udf.RuntimeUDFContext;
import org.apache.flink.runtime.operators.util.CloseableInputProvider;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.operators.util.RecordReaderIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.profiling.ProfilingUtils;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;

/* loaded from: input_file:org/apache/flink/runtime/operators/RegularPactTask.class */
public class RegularPactTask<S extends Function, OT> extends AbstractInvokable implements PactTaskContext<S, OT> {
    protected static final Log LOG = LogFactory.getLog(RegularPactTask.class);
    protected volatile PactDriver<S, OT> driver;
    protected S stub;
    protected RuntimeUDFContext runtimeUdfContext;
    protected Collector<OT> output;
    protected List<BufferWriter> eventualOutputs;
    protected MutableReader<?>[] inputReaders;
    protected MutableReader<?>[] broadcastInputReaders;
    protected MutableObjectIterator<?>[] inputIterators;
    protected MutableObjectIterator<?>[] broadcastInputIterators;
    protected int[] iterativeInputs;
    protected int[] iterativeBroadcastInputs;
    protected volatile CloseableInputProvider<?>[] localStrategies;
    protected volatile TempBarrier<?>[] tempBarriers;
    protected volatile SpillingResettableMutableObjectIterator<?>[] resettableInputs;
    protected MutableObjectIterator<?>[] inputs;
    protected TypeSerializerFactory<?>[] inputSerializers;
    protected TypeSerializerFactory<?>[] broadcastInputSerializers;
    protected TypeComparator<?>[] inputComparators;
    protected TaskConfig config;
    protected ClassLoader userCodeClassLoader;
    protected ArrayList<ChainedDriver<?, ?>> chainedTasks;
    private boolean[] excludeFromReset;
    private boolean[] inputIsCached;
    private boolean[] inputIsAsyncMaterialized;
    private int[] materializationMemory;
    protected volatile boolean running = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.operators.RegularPactTask$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/operators/RegularPactTask$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$operators$util$LocalStrategy = new int[LocalStrategy.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$operators$util$LocalStrategy[LocalStrategy.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$operators$util$LocalStrategy[LocalStrategy.SORT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$operators$util$LocalStrategy[LocalStrategy.COMBININGSORT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
    public void registerInputOutput() {
        if (LOG.isDebugEnabled()) {
            LOG.debug(formatLogString("Start registering input and output."));
        }
        if (this.userCodeClassLoader == null) {
            try {
                this.userCodeClassLoader = LibraryCacheManager.getClassLoader(getEnvironment().getJobID());
            } catch (IOException e) {
                throw new RuntimeException("The ClassLoader for the user code could not be instantiated from the library cache.", e);
            }
        }
        Configuration taskConfiguration = getTaskConfiguration();
        taskConfiguration.setClassLoader(this.userCodeClassLoader);
        this.config = new TaskConfig(taskConfiguration);
        this.driver = (PactDriver) InstantiationUtil.instantiate(this.config.getDriver(), PactDriver.class);
        try {
            initInputReaders();
            initBroadcastInputReaders();
            try {
                initOutputs();
                if (LOG.isDebugEnabled()) {
                    LOG.debug(formatLogString("Finished registering input and output."));
                }
            } catch (Exception e2) {
                throw new RuntimeException(new StringBuilder().append("Initializing the output handlers failed").append(e2.getMessage()).toString() == null ? "." : ": " + e2.getMessage(), e2);
            }
        } catch (Exception e3) {
            throw new RuntimeException("Initializing the input streams failed in Task " + getEnvironment().getTaskName() + (e3.getMessage() == null ? "." : ": " + e3.getMessage()), e3);
        }
    }

    @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
    public void invoke() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug(formatLogString("Start task code."));
        }
        try {
            initOutputWriters(this.eventualOutputs);
            try {
                int numberOfInputs = this.driver.getNumberOfInputs();
                int numBroadcastInputs = this.config.getNumBroadcastInputs();
                initInputsSerializersAndComparators(numberOfInputs);
                initBroadcastInputsSerializers(numBroadcastInputs);
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < numberOfInputs; i++) {
                    int numberOfEventsUntilInterruptInIterativeGate = getTaskConfig().getNumberOfEventsUntilInterruptInIterativeGate(i);
                    if (numberOfEventsUntilInterruptInIterativeGate < 0) {
                        throw new IllegalArgumentException();
                    }
                    if (numberOfEventsUntilInterruptInIterativeGate > 0) {
                        this.inputReaders[i].setIterative(numberOfEventsUntilInterruptInIterativeGate);
                        arrayList.add(Integer.valueOf(i));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(formatLogString("Input [" + i + "] reads in supersteps with [" + numberOfEventsUntilInterruptInIterativeGate + "] event(s) till next superstep."));
                        }
                    }
                }
                this.iterativeInputs = asArray(arrayList);
                ArrayList arrayList2 = new ArrayList();
                for (int i2 = 0; i2 < numBroadcastInputs; i2++) {
                    int numberOfEventsUntilInterruptInIterativeBroadcastGate = getTaskConfig().getNumberOfEventsUntilInterruptInIterativeBroadcastGate(i2);
                    if (numberOfEventsUntilInterruptInIterativeBroadcastGate < 0) {
                        throw new IllegalArgumentException();
                    }
                    if (numberOfEventsUntilInterruptInIterativeBroadcastGate > 0) {
                        this.broadcastInputReaders[i2].setIterative(numberOfEventsUntilInterruptInIterativeBroadcastGate);
                        arrayList2.add(Integer.valueOf(i2));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(formatLogString("Broadcast input [" + i2 + "] reads in supersteps with [" + numberOfEventsUntilInterruptInIterativeBroadcastGate + "] event(s) till next superstep."));
                        }
                    }
                }
                this.iterativeBroadcastInputs = asArray(arrayList2);
                initLocalStrategies(numberOfInputs);
                if (!this.running) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(formatLogString("Task cancelled before task code was started."));
                    }
                    return;
                }
                initialize();
                for (int i3 = 0; i3 < this.config.getNumBroadcastInputs(); i3++) {
                    readAndSetBroadcastInput(i3, this.config.getBroadcastInputName(i3), this.runtimeUdfContext);
                }
                run();
                closeLocalStrategiesAndCaches();
                if (this.running) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(formatLogString("Finished task code."));
                    }
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug(formatLogString("Task code cancelled."));
                }
            } catch (Exception e) {
                throw new RuntimeException(new StringBuilder().append("Initializing the input processing failed").append(e.getMessage()).toString() == null ? "." : ": " + e.getMessage(), e);
            }
        } finally {
            closeLocalStrategiesAndCaches();
        }
    }

    @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
    public void cancel() throws Exception {
        this.running = false;
        if (LOG.isDebugEnabled()) {
            LOG.debug(formatLogString("Cancelling task code"));
        }
        try {
            if (this.driver != null) {
                this.driver.cancel();
            }
        } finally {
            closeLocalStrategiesAndCaches();
        }
    }

    public void setUserCodeClassLoader(ClassLoader classLoader) {
        this.userCodeClassLoader = classLoader;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initialize() throws Exception {
        try {
            this.driver.setup(this);
            this.runtimeUdfContext = createRuntimeContext(getEnvironment().getTaskName());
            try {
                Class<? super S> stubType = this.driver.getStubType();
                if (stubType != null) {
                    this.stub = initStub(stubType);
                }
            } catch (Exception e) {
                throw new RuntimeException(new StringBuilder().append("Initializing the UDF").append(e.getMessage()).toString() == null ? "." : ": " + e.getMessage(), e);
            }
        } catch (Throwable th) {
            throw new Exception("The driver setup for '" + getEnvironment().getTaskName() + "' , caused an error: " + th.getMessage(), th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <X> void readAndSetBroadcastInput(int i, String str, RuntimeUDFContext runtimeUDFContext) throws IOException {
        Object next;
        MutableObjectIterator<?> mutableObjectIterator = this.broadcastInputIterators[i];
        TypeSerializer serializer = this.broadcastInputSerializers[i].getSerializer();
        ArrayList arrayList = new ArrayList();
        Object createInstance = serializer.createInstance();
        while (true) {
            Object obj = createInstance;
            if (!this.running || (next = mutableObjectIterator.next(obj)) == null) {
                break;
            }
            arrayList.add(next);
            createInstance = serializer.createInstance();
        }
        runtimeUDFContext.setBroadcastVariable(str, arrayList);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void run() throws Exception {
        if (this.running) {
            try {
                try {
                    try {
                        this.driver.prepare();
                        if (!this.running) {
                            this.driver.cleanup();
                            return;
                        }
                        openChainedTasks(this.chainedTasks, this);
                        if (this.stub != null) {
                            try {
                                FunctionUtils.openFunction(this.stub, this.config.getStubParameters());
                            } catch (Throwable th) {
                                throw new Exception("The user defined 'open()' method caused an exception: " + th.getMessage(), th);
                            }
                        }
                        this.driver.run();
                        if (this.running && this.stub != null) {
                            FunctionUtils.closeFunction(this.stub);
                        }
                        this.output.close();
                        closeChainedTasks(this.chainedTasks, this);
                        if (this.stub != null && FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) {
                            reportAndClearAccumulators(getEnvironment(), FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators(), this.chainedTasks);
                        }
                        this.driver.cleanup();
                    } catch (Exception e) {
                        if (0 != 0) {
                            try {
                                FunctionUtils.closeFunction(this.stub);
                            } catch (Throwable th2) {
                            }
                        }
                        if (this.driver instanceof ResettablePactDriver) {
                            try {
                                ((ResettablePactDriver) this.driver).teardown();
                            } catch (Throwable th3) {
                                throw new Exception("Error while shutting down an iterative operator: " + th3.getMessage(), th3);
                            }
                        }
                        cancelChainedTasks(this.chainedTasks);
                        Exception exceptionUnwrap = ExceptionInChainedStubException.exceptionUnwrap(e);
                        if (exceptionUnwrap instanceof CancelTaskException) {
                            throw exceptionUnwrap;
                        }
                        if (this.running) {
                            logAndThrowException(exceptionUnwrap, this);
                        }
                        this.driver.cleanup();
                    }
                } catch (Throwable th4) {
                    throw new Exception("The data preparation for task '" + getEnvironment().getTaskName() + "' , caused an error: " + th4.getMessage(), th4);
                }
            } catch (Throwable th5) {
                this.driver.cleanup();
                throw th5;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void reportAndClearAccumulators(Environment environment, Map<String, Accumulator<?, ?>> map, ArrayList<ChainedDriver<?, ?>> arrayList) {
        Iterator<ChainedDriver<?, ?>> it = arrayList.iterator();
        while (it.hasNext()) {
            ChainedDriver<?, ?> next = it.next();
            if (FunctionUtils.getFunctionRuntimeContext(next.mo108getStub(), (RuntimeContext) null) != null) {
                AccumulatorHelper.mergeInto(map, FunctionUtils.getFunctionRuntimeContext(next.mo108getStub(), (RuntimeContext) null).getAllAccumulators());
            }
        }
        if (map.size() == 0) {
            return;
        }
        synchronized (environment.getAccumulatorProtocolProxy()) {
            try {
                environment.getAccumulatorProtocolProxy().reportAccumulatorResult(new AccumulatorEvent(environment.getJobID(), map));
            } catch (IOException e) {
                throw new RuntimeException("Communication with JobManager is broken. Could not send accumulators.", e);
            }
        }
        AccumulatorHelper.resetAndClearAccumulators(map);
        Iterator<ChainedDriver<?, ?>> it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ChainedDriver<?, ?> next2 = it2.next();
            if (FunctionUtils.getFunctionRuntimeContext(next2.mo108getStub(), (RuntimeContext) null) != null) {
                AccumulatorHelper.resetAndClearAccumulators(FunctionUtils.getFunctionRuntimeContext(next2.mo108getStub(), (RuntimeContext) null).getAllAccumulators());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeLocalStrategiesAndCaches() {
        if (this.localStrategies != null) {
            for (int i = 0; i < this.localStrategies.length; i++) {
                if (this.localStrategies[i] != null) {
                    try {
                        this.localStrategies[i].close();
                    } catch (Throwable th) {
                        LOG.error("Error closing local strategy for input " + i, th);
                    }
                }
            }
        }
        if (this.tempBarriers != null) {
            for (int i2 = 0; i2 < this.tempBarriers.length; i2++) {
                if (this.tempBarriers[i2] != null) {
                    try {
                        this.tempBarriers[i2].close();
                    } catch (Throwable th2) {
                        LOG.error("Error closing temp barrier for input " + i2, th2);
                    }
                }
            }
        }
        if (this.resettableInputs != null) {
            for (int i3 = 0; i3 < this.resettableInputs.length; i3++) {
                if (this.resettableInputs[i3] != null) {
                    try {
                        this.resettableInputs[i3].close();
                    } catch (Throwable th3) {
                        LOG.error("Error closing cache for input " + i3, th3);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collector<OT> getLastOutputCollector() {
        int size = this.chainedTasks.size();
        return size == 0 ? this.output : (Collector<OT>) this.chainedTasks.get(size - 1).getOutputCollector();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setLastOutputCollector(Collector<OT> collector) {
        int size = this.chainedTasks.size();
        if (size == 0) {
            this.output = collector;
        } else {
            this.chainedTasks.get(size - 1).setOutputCollector(collector);
        }
    }

    public TaskConfig getLastTasksConfig() {
        int size = this.chainedTasks.size();
        return size == 0 ? this.config : this.chainedTasks.get(size - 1).getTaskConfig();
    }

    protected S initStub(Class<? super S> cls) throws Exception {
        try {
            S s = (S) this.config.getStubWrapper(this.userCodeClassLoader).getUserCodeObject(cls, this.userCodeClassLoader);
            if (cls != null && !cls.isAssignableFrom(s.getClass())) {
                throw new RuntimeException("The class '" + s.getClass().getName() + "' is not a subclass of '" + cls.getName() + "' as is required.");
            }
            FunctionUtils.setFunctionRuntimeContext(s, this.runtimeUdfContext);
            return s;
        } catch (ClassCastException e) {
            throw new Exception("The stub class is not a proper subclass of " + cls.getName(), e);
        }
    }

    protected void initInputReaders() throws Exception {
        int numTaskInputs = getNumTaskInputs();
        MutableReader<?>[] mutableReaderArr = new MutableReader[numTaskInputs];
        int i = 0;
        for (int i2 = 0; i2 < numTaskInputs; i2++) {
            int groupSize = this.config.getGroupSize(i2);
            i += groupSize;
            if (groupSize == 1) {
                mutableReaderArr[i2] = new MutableRecordReader(this);
            } else {
                if (groupSize <= 1) {
                    throw new Exception("Illegal input group size in task configuration: " + groupSize);
                }
                MutableRecordReader[] mutableRecordReaderArr = new MutableRecordReader[groupSize];
                for (int i3 = 0; i3 < groupSize; i3++) {
                    mutableRecordReaderArr[i3] = new MutableRecordReader(this);
                }
                mutableReaderArr[i2] = new MutableUnionRecordReader(mutableRecordReaderArr);
            }
        }
        this.inputReaders = mutableReaderArr;
        if (i != this.config.getNumInputs()) {
            throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
        }
    }

    protected void initBroadcastInputReaders() throws Exception {
        MutableReader<?>[] mutableReaderArr = new MutableReader[this.config.getNumBroadcastInputs()];
        for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
            int broadcastGroupSize = this.config.getBroadcastGroupSize(i);
            if (broadcastGroupSize == 1) {
                mutableReaderArr[i] = new MutableRecordReader(this);
            } else {
                if (broadcastGroupSize <= 1) {
                    throw new Exception("Illegal input group size in task configuration: " + broadcastGroupSize);
                }
                MutableRecordReader[] mutableRecordReaderArr = new MutableRecordReader[broadcastGroupSize];
                for (int i2 = 0; i2 < broadcastGroupSize; i2++) {
                    mutableRecordReaderArr[i2] = new MutableRecordReader(this);
                }
                mutableReaderArr[i] = new MutableUnionRecordReader(mutableRecordReaderArr);
            }
        }
        this.broadcastInputReaders = mutableReaderArr;
    }

    protected void initInputsSerializersAndComparators(int i) throws Exception {
        this.inputSerializers = new TypeSerializerFactory[i];
        this.inputComparators = this.driver.requiresComparatorOnInput() ? new TypeComparator[i] : null;
        this.inputIterators = new MutableObjectIterator[i];
        for (int i2 = 0; i2 < i; i2++) {
            this.inputSerializers[i2] = this.config.getInputSerializer(i2, this.userCodeClassLoader);
            if (this.inputComparators != null) {
                this.inputComparators[i2] = this.config.getDriverComparator(i2, this.userCodeClassLoader).createComparator();
            }
            this.inputIterators[i2] = createInputIterator(this.inputReaders[i2], this.inputSerializers[i2]);
        }
    }

    protected void initBroadcastInputsSerializers(int i) throws Exception {
        this.broadcastInputSerializers = new TypeSerializerFactory[i];
        this.broadcastInputIterators = new MutableObjectIterator[i];
        for (int i2 = 0; i2 < i; i2++) {
            this.broadcastInputSerializers[i2] = this.config.getBroadcastInputSerializer(i2, this.userCodeClassLoader);
            this.broadcastInputIterators[i2] = createInputIterator(this.broadcastInputReaders[i2], this.broadcastInputSerializers[i2]);
        }
    }

    protected void initLocalStrategies(int i) throws Exception {
        int computeNumberOfPages;
        MemoryManager memoryManager = getMemoryManager();
        IOManager iOManager = getIOManager();
        this.localStrategies = new CloseableInputProvider[i];
        this.inputs = new MutableObjectIterator[i];
        this.excludeFromReset = new boolean[i];
        this.inputIsCached = new boolean[i];
        this.inputIsAsyncMaterialized = new boolean[i];
        this.materializationMemory = new int[i];
        for (int i2 = 0; i2 < i; i2++) {
            initInputLocalStrategy(i2);
        }
        this.resettableInputs = new SpillingResettableMutableObjectIterator[i];
        this.tempBarriers = new TempBarrier[i];
        for (int i3 = 0; i3 < i; i3++) {
            boolean isInputAsynchronouslyMaterialized = this.config.isInputAsynchronouslyMaterialized(i3);
            boolean isInputCached = this.config.isInputCached(i3);
            this.inputIsAsyncMaterialized[i3] = isInputAsynchronouslyMaterialized;
            this.inputIsCached[i3] = isInputCached;
            if (isInputAsynchronouslyMaterialized || isInputCached) {
                computeNumberOfPages = memoryManager.computeNumberOfPages(this.config.getRelativeInputMaterializationMemory(i3));
                if (computeNumberOfPages <= 0) {
                    throw new Exception("Input marked as materialized/cached, but no memory for materialization provided.");
                }
                this.materializationMemory[i3] = computeNumberOfPages;
            } else {
                computeNumberOfPages = 0;
            }
            if (isInputAsynchronouslyMaterialized) {
                TempBarrier<?> tempBarrier = new TempBarrier<>(this, getInput(i3), this.inputSerializers[i3], memoryManager, iOManager, computeNumberOfPages);
                tempBarrier.startReading();
                this.tempBarriers[i3] = tempBarrier;
                this.inputs[i3] = null;
            } else if (isInputCached) {
                SpillingResettableMutableObjectIterator<?> spillingResettableMutableObjectIterator = new SpillingResettableMutableObjectIterator<>((MutableObjectIterator<?>) getInput(i3), (TypeSerializer<?>) this.inputSerializers[i3].getSerializer(), getMemoryManager(), getIOManager(), computeNumberOfPages, (AbstractInvokable) this);
                this.resettableInputs[i3] = spillingResettableMutableObjectIterator;
                this.inputs[i3] = spillingResettableMutableObjectIterator;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetAllInputs() throws Exception {
        for (int i = 0; i < this.localStrategies.length; i++) {
            if (this.localStrategies[i] != null) {
                this.localStrategies[i].close();
                this.localStrategies[i] = null;
            }
        }
        MemoryManager memoryManager = getMemoryManager();
        IOManager iOManager = getIOManager();
        for (int i2 = 0; i2 < this.inputs.length; i2++) {
            if (!this.excludeFromReset[i2]) {
                this.inputs[i2] = null;
                if (!this.inputIsCached[i2]) {
                    if (this.tempBarriers[i2] != null) {
                        this.tempBarriers[i2].close();
                    }
                    initInputLocalStrategy(i2);
                    if (this.inputIsAsyncMaterialized[i2]) {
                        TempBarrier<?> tempBarrier = new TempBarrier<>(this, getInput(i2), this.inputSerializers[i2], memoryManager, iOManager, this.materializationMemory[i2]);
                        tempBarrier.startReading();
                        this.tempBarriers[i2] = tempBarrier;
                        this.inputs[i2] = null;
                    }
                } else if (this.tempBarriers[i2] != null) {
                    this.inputs[i2] = this.tempBarriers[i2].getIterator();
                } else {
                    if (this.resettableInputs[i2] == null) {
                        throw new RuntimeException("Found a resettable input, but no temp barrier and no resettable iterator.");
                    }
                    this.resettableInputs[i2].consumeAndCacheRemainingData();
                    this.resettableInputs[i2].reset();
                    this.inputs[i2] = this.resettableInputs[i2];
                }
            } else if (this.tempBarriers[i2] != null) {
                this.tempBarriers[i2].close();
                this.tempBarriers[i2] = null;
            } else if (this.resettableInputs[i2] != null) {
                this.resettableInputs[i2].close();
                this.resettableInputs[i2] = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void excludeFromReset(int i) {
        this.excludeFromReset[i] = true;
    }

    private void initInputLocalStrategy(int i) throws Exception {
        if (this.localStrategies[i] != null) {
            throw new IllegalStateException();
        }
        LocalStrategy inputLocalStrategy = this.config.getInputLocalStrategy(i);
        if (inputLocalStrategy == null) {
            this.inputs[i] = this.inputIterators[i];
            return;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$operators$util$LocalStrategy[inputLocalStrategy.ordinal()]) {
            case 1:
                this.inputs[i] = this.inputIterators[i];
                return;
            case ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL /* 2 */:
                UnilateralSortMerger unilateralSortMerger = new UnilateralSortMerger(getMemoryManager(), getIOManager(), this.inputIterators[i], this, this.inputSerializers[i], getLocalStrategyComparator(i), this.config.getRelativeMemoryInput(i), this.config.getFilehandlesInput(i), this.config.getSpillingThresholdInput(i));
                this.inputs[i] = null;
                this.localStrategies[i] = unilateralSortMerger;
                return;
            case 3:
                if (i != 0) {
                    throw new IllegalStateException("Performing combining sort outside a (group)reduce task!");
                }
                Class<? super S> stubType = this.driver.getStubType();
                if (stubType == null) {
                    throw new IllegalStateException("Performing combining sort outside a reduce task!");
                }
                try {
                    FlatCombineFunction initStub = initStub(stubType);
                    if (!(initStub instanceof FlatCombineFunction)) {
                        throw new IllegalStateException("Performing combining sort outside a reduce task!");
                    }
                    CombiningUnilateralSortMerger combiningUnilateralSortMerger = new CombiningUnilateralSortMerger(initStub, getMemoryManager(), getIOManager(), this.inputIterators[i], this, this.inputSerializers[i], getLocalStrategyComparator(i), this.config.getRelativeMemoryInput(i), this.config.getFilehandlesInput(i), this.config.getSpillingThresholdInput(i));
                    combiningUnilateralSortMerger.setUdfConfiguration(this.config.getStubParameters());
                    this.inputs[i] = null;
                    this.localStrategies[i] = combiningUnilateralSortMerger;
                    return;
                } catch (Exception e) {
                    throw new RuntimeException(new StringBuilder().append("Initializing the user code and the configuration failed").append(e.getMessage()).toString() == null ? "." : ": " + e.getMessage(), e);
                }
            default:
                throw new Exception("Unrecognized local strategy provided: " + inputLocalStrategy.name());
        }
    }

    private <T> TypeComparator<T> getLocalStrategyComparator(int i) throws Exception {
        TypeComparatorFactory<T> inputComparator = this.config.getInputComparator(i, this.userCodeClassLoader);
        if (inputComparator == null) {
            throw new Exception("Missing comparator factory for local strategy on input " + i);
        }
        return inputComparator.createComparator();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MutableObjectIterator<?> createInputIterator(MutableReader<?> mutableReader, TypeSerializerFactory<?> typeSerializerFactory) {
        return typeSerializerFactory.getDataType().equals(Record.class) ? new RecordReaderIterator(mutableReader) : new ReaderIterator(mutableReader, typeSerializerFactory.getSerializer());
    }

    protected int getNumTaskInputs() {
        return this.driver.getNumberOfInputs();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initOutputs() throws Exception {
        this.chainedTasks = new ArrayList<>();
        this.eventualOutputs = new ArrayList();
        this.output = initOutputs(this, this.userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs);
    }

    public RuntimeUDFContext createRuntimeContext(String str) {
        Environment environment = getEnvironment();
        return new RuntimeUDFContext(str, environment.getCurrentNumberOfSubtasks(), environment.getIndexInSubtaskGroup(), environment.getCopyTask());
    }

    @Override // org.apache.flink.runtime.operators.PactTaskContext
    public TaskConfig getTaskConfig() {
        return this.config;
    }

    @Override // org.apache.flink.runtime.operators.PactTaskContext
    public ClassLoader getUserCodeClassLoader() {
        return this.userCodeClassLoader;
    }

    @Override // org.apache.flink.runtime.operators.PactTaskContext
    public MemoryManager getMemoryManager() {
        return getEnvironment().getMemoryManager();
    }

    @Override // org.apache.flink.runtime.operators.PactTaskContext
    public IOManager getIOManager() {
        return getEnvironment().getIOManager();
    }

    @Override // org.apache.flink.runtime.operators.PactTaskContext
    public S getStub() {
        return this.stub;
    }

    @Override // org.apache.flink.runtime.operators.PactTaskContext
    public Collector<OT> getOutputCollector() {
        return this.output;
    }

    @Override // org.apache.flink.runtime.operators.PactTaskContext
    public AbstractInvokable getOwningNepheleTask() {
        return this;
    }

    @Override // org.apache.flink.runtime.operators.PactTaskContext
    public String formatLogString(String str) {
        return constructLogString(str, getEnvironment().getTaskName(), this);
    }

    @Override // org.apache.flink.runtime.operators.PactTaskContext
    public <X> MutableObjectIterator<X> getInput(int i) {
        MutableObjectIterator<?> iterator;
        if (i < 0 || i > this.driver.getNumberOfInputs()) {
            throw new IndexOutOfBoundsException();
        }
        if (this.inputs[i] != null) {
            return (MutableObjectIterator<X>) this.inputs[i];
        }
        try {
            if (this.tempBarriers[i] != null) {
                iterator = this.tempBarriers[i].getIterator();
            } else {
                if (this.localStrategies[i] == null) {
                    throw new RuntimeException("Bug: null input iterator, null temp barrier, and null local strategy.");
                }
                iterator = this.localStrategies[i].getIterator();
            }
            this.inputs[i] = iterator;
            return (MutableObjectIterator<X>) iterator;
        } catch (IOException e) {
            throw new RuntimeException("An I/O Exception occurred whily obaining input " + i + ".");
        } catch (InterruptedException e2) {
            throw new RuntimeException("Interrupted while waiting for input " + i + " to become available.");
        }
    }

    @Override // org.apache.flink.runtime.operators.PactTaskContext
    public <X> TypeSerializerFactory<X> getInputSerializer(int i) {
        if (i < 0 || i >= this.driver.getNumberOfInputs()) {
            throw new IndexOutOfBoundsException();
        }
        return (TypeSerializerFactory<X>) this.inputSerializers[i];
    }

    @Override // org.apache.flink.runtime.operators.PactTaskContext
    public <X> TypeComparator<X> getInputComparator(int i) {
        if (this.inputComparators == null) {
            throw new IllegalStateException("Comparators have not been created!");
        }
        if (i < 0 || i >= this.driver.getNumberOfInputs()) {
            throw new IndexOutOfBoundsException();
        }
        return (TypeComparator<X>) this.inputComparators[i];
    }

    public static String constructLogString(String str, String str2, AbstractInvokable abstractInvokable) {
        return str + ":  " + str2 + " (" + (abstractInvokable.getEnvironment().getIndexInSubtaskGroup() + 1) + '/' + abstractInvokable.getEnvironment().getCurrentNumberOfSubtasks() + ')';
    }

    /* JADX WARN: Removed duplicated region for block: B:8:0x0035  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public static void logAndThrowException(java.lang.Exception r5, org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable r6) throws java.lang.Exception {
        /*
            r0 = r5
            boolean r0 = r0 instanceof org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
            if (r0 == 0) goto L20
        L7:
            r0 = r5
            org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException r0 = (org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException) r0
            r8 = r0
            r0 = r8
            java.lang.String r0 = r0.getTaskName()
            r7 = r0
            r0 = r8
            java.lang.Exception r0 = r0.getWrappedException()
            r5 = r0
            r0 = r5
            boolean r0 = r0 instanceof org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
            if (r0 != 0) goto L7
            goto L2a
        L20:
            r0 = r6
            org.apache.flink.runtime.execution.Environment r0 = r0.getEnvironment()
            java.lang.String r0 = r0.getTaskName()
            r7 = r0
        L2a:
            org.apache.commons.logging.Log r0 = org.apache.flink.runtime.operators.RegularPactTask.LOG
            boolean r0 = r0.isErrorEnabled()
            if (r0 == 0) goto L46
            org.apache.commons.logging.Log r0 = org.apache.flink.runtime.operators.RegularPactTask.LOG
            java.lang.String r1 = "Error in task code"
            r2 = r7
            r3 = r6
            java.lang.String r1 = constructLogString(r1, r2, r3)
            r2 = r5
            r0.error(r1, r2)
        L46:
            r0 = r5
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.operators.RegularPactTask.logAndThrowException(java.lang.Exception, org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable):void");
    }

    public static <T> Collector<T> getOutputCollector(AbstractInvokable abstractInvokable, TaskConfig taskConfig, ClassLoader classLoader, List<BufferWriter> list, int i) throws Exception {
        RecordOutputEmitter recordOutputEmitter;
        if (i <= 0) {
            throw new Exception("BUG: The task must have at least one output");
        }
        TypeSerializerFactory<T> outputSerializer = taskConfig.getOutputSerializer(classLoader);
        if (!outputSerializer.getDataType().equals(Record.class)) {
            ArrayList arrayList = new ArrayList(i);
            for (int i2 = 0; i2 < i; i2++) {
                ShipStrategyType outputShipStrategy = taskConfig.getOutputShipStrategy(i2);
                TypeComparatorFactory<T> outputComparator = taskConfig.getOutputComparator(i2, classLoader);
                DataDistribution outputDataDistribution = taskConfig.getOutputDataDistribution(i2, classLoader);
                arrayList.add(new RecordWriter(abstractInvokable, outputComparator == null ? new OutputEmitter(outputShipStrategy) : outputDataDistribution == null ? new OutputEmitter(outputShipStrategy, outputComparator.createComparator()) : new OutputEmitter(outputShipStrategy, outputComparator.createComparator(), outputDataDistribution)));
            }
            if (list != null) {
                list.addAll(arrayList);
            }
            return new OutputCollector(arrayList, outputSerializer.getSerializer());
        }
        ArrayList arrayList2 = new ArrayList(i);
        for (int i3 = 0; i3 < i; i3++) {
            ShipStrategyType outputShipStrategy2 = taskConfig.getOutputShipStrategy(i3);
            TypeComparatorFactory<T> outputComparator2 = taskConfig.getOutputComparator(i3, classLoader);
            if (outputComparator2 == null) {
                recordOutputEmitter = new RecordOutputEmitter(outputShipStrategy2);
            } else {
                TypeComparator createComparator = outputComparator2.createComparator();
                if (!createComparator.supportsCompareAgainstReference()) {
                    throw new Exception("Incompatibe serializer-/comparator factories.");
                }
                recordOutputEmitter = new RecordOutputEmitter(outputShipStrategy2, createComparator, taskConfig.getOutputDataDistribution(i3, classLoader));
            }
            arrayList2.add(new RecordWriter(abstractInvokable, recordOutputEmitter));
        }
        if (list != null) {
            list.addAll(arrayList2);
        }
        return new RecordOutputCollector(arrayList2);
    }

    public static <T> Collector<T> initOutputs(AbstractInvokable abstractInvokable, ClassLoader classLoader, TaskConfig taskConfig, List<ChainedDriver<?, ?>> list, List<BufferWriter> list2) throws Exception {
        int numOutputs = taskConfig.getNumOutputs();
        int numberOfChainedStubs = taskConfig.getNumberOfChainedStubs();
        if (numberOfChainedStubs <= 0) {
            return getOutputCollector(abstractInvokable, taskConfig, classLoader, list2, numOutputs);
        }
        if (numOutputs != 1 || taskConfig.getOutputShipStrategy(0) != ShipStrategyType.FORWARD) {
            throw new RuntimeException("Plan Generation Bug: Found a chained stub that is not connected via an only forward connection.");
        }
        Collector<?> collector = null;
        for (int i = numberOfChainedStubs - 1; i >= 0; i--) {
            try {
                ChainedDriver<?, ?> newInstance = taskConfig.getChainedTask(i).newInstance();
                TaskConfig chainedStubConfig = taskConfig.getChainedStubConfig(i);
                String chainedTaskName = taskConfig.getChainedTaskName(i);
                if (i == numberOfChainedStubs - 1) {
                    collector = getOutputCollector(abstractInvokable, chainedStubConfig, classLoader, list2, chainedStubConfig.getNumOutputs());
                }
                newInstance.setup(chainedStubConfig, chainedTaskName, collector, abstractInvokable, classLoader);
                list.add(0, newInstance);
                collector = newInstance;
            } catch (Exception e) {
                throw new RuntimeException("Could not instantiate chained task driver.", e);
            }
        }
        return (Collector<T>) collector;
    }

    public static void initOutputWriters(List<BufferWriter> list) {
        Iterator<BufferWriter> it = list.iterator();
        while (it.hasNext()) {
            ((RecordWriter) it.next()).initializeSerializers();
        }
    }

    public static void openUserCode(Function function, Configuration configuration) throws Exception {
        try {
            FunctionUtils.openFunction(function, configuration);
        } catch (Throwable th) {
            throw new Exception("The user defined 'open(Configuration)' method in " + function.getClass().toString() + " caused an exception: " + th.getMessage(), th);
        }
    }

    public static void closeUserCode(Function function) throws Exception {
        try {
            FunctionUtils.closeFunction(function);
        } catch (Throwable th) {
            throw new Exception("The user defined 'close()' method caused an exception: " + th.getMessage(), th);
        }
    }

    public static void openChainedTasks(List<ChainedDriver<?, ?>> list, AbstractInvokable abstractInvokable) throws Exception {
        for (int i = 0; i < list.size(); i++) {
            ChainedDriver<?, ?> chainedDriver = list.get(i);
            if (LOG.isDebugEnabled()) {
                LOG.debug(constructLogString("Start task code", chainedDriver.getTaskName(), abstractInvokable));
            }
            chainedDriver.openTask();
        }
    }

    public static void closeChainedTasks(List<ChainedDriver<?, ?>> list, AbstractInvokable abstractInvokable) throws Exception {
        for (int i = 0; i < list.size(); i++) {
            ChainedDriver<?, ?> chainedDriver = list.get(i);
            chainedDriver.closeTask();
            if (LOG.isDebugEnabled()) {
                LOG.debug(constructLogString("Finished task code", chainedDriver.getTaskName(), abstractInvokable));
            }
        }
    }

    public static void cancelChainedTasks(List<ChainedDriver<?, ?>> list) {
        for (int i = 0; i < list.size(); i++) {
            try {
                list.get(i).cancelTask();
            } catch (Throwable th) {
            }
        }
    }

    public static <T> T instantiateUserCode(TaskConfig taskConfig, ClassLoader classLoader, Class<? super T> cls) {
        try {
            T t = (T) taskConfig.getStubWrapper(classLoader).getUserCodeObject(cls, classLoader);
            if (cls == null || cls.isAssignableFrom(t.getClass())) {
                return t;
            }
            throw new RuntimeException("The class '" + t.getClass().getName() + "' is not a subclass of '" + cls.getName() + "' as is required.");
        } catch (ClassCastException e) {
            throw new RuntimeException("The UDF class is not a proper subclass of " + cls.getName(), e);
        }
    }

    private static int[] asArray(List<Integer> list) {
        int[] iArr = new int[list.size()];
        int i = 0;
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            iArr[i2] = it.next().intValue();
        }
        return iArr;
    }
}
