/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSourceTask<OT>
extends AbstractInvokable {
    private static final Logger LOG = LoggerFactory.getLogger(DataSourceTask.class);
    private List<RecordWriter<?>> eventualOutputs;
    private Collector<OT> output;
    private InputFormat<OT, InputSplit> format;
    private TypeSerializerFactory<OT> serializerFactory;
    private TaskConfig config;
    private ArrayList<ChainedDriver<?, ?>> chainedTasks;
    private volatile boolean taskCanceled = false;

    @Override
    public void registerInputOutput() {
        this.initInputFormat();
        LOG.debug(this.getLogString("Start registering input and output"));
        try {
            this.initOutputs(this.getUserCodeClassLoader());
        }
        catch (Exception ex) {
            throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " + ex.getMessage(), ex);
        }
        LOG.debug(this.getLogString("Finished registering input and output"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void invoke() throws Exception {
        ExecutionConfig executionConfig;
        LOG.debug(this.getLogString("Starting data source operator"));
        if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
            ((RichInputFormat)this.format).setRuntimeContext((RuntimeContext)this.createRuntimeContext());
            LOG.debug(this.getLogString("Rich Source detected. Initializing runtime context."));
        }
        try {
            ExecutionConfig c = (ExecutionConfig)InstantiationUtil.readObjectFromConfig((Configuration)this.getJobConfiguration(), (String)"runtime.config", (ClassLoader)this.getUserCodeClassLoader());
            if (c != null) {
                executionConfig = c;
            } else {
                LOG.warn("ExecutionConfig from job configuration is null. Creating empty config");
                executionConfig = new ExecutionConfig();
            }
        }
        catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: ", e);
        }
        boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
        LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
        TypeSerializer serializer = this.serializerFactory.getSerializer();
        try {
            BatchTask.openChainedTasks(this.chainedTasks, this);
            Iterator<InputSplit> splitIterator = this.getInputSplits();
            while (!this.taskCanceled && splitIterator.hasNext()) {
                InputSplit split = splitIterator.next();
                LOG.debug(this.getLogString("Opening input split " + split.toString()));
                InputFormat<OT, InputSplit> format = this.format;
                format.open(split);
                LOG.debug(this.getLogString("Starting to read input from split " + split.toString()));
                try {
                    Collector<OT> output = this.output;
                    if (objectReuseEnabled) {
                        Object reuse = serializer.createInstance();
                        while (!this.taskCanceled && !format.reachedEnd()) {
                            Object returned = format.nextRecord(reuse);
                            if (returned == null) continue;
                            output.collect(returned);
                        }
                    } else {
                        while (!this.taskCanceled && !format.reachedEnd()) {
                            Object returned = format.nextRecord(serializer.createInstance());
                            if (returned == null) continue;
                            output.collect(returned);
                        }
                    }
                    if (!LOG.isDebugEnabled() || this.taskCanceled) continue;
                    LOG.debug(this.getLogString("Closing input split " + split.toString()));
                }
                finally {
                    format.close();
                }
            }
            this.output.close();
            BatchTask.closeChainedTasks(this.chainedTasks, this);
        }
        catch (Exception ex) {
            try {
                this.format.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            BatchTask.cancelChainedTasks(this.chainedTasks);
            ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
            if (ex instanceof CancelTaskException) {
                throw ex;
            }
            if (!this.taskCanceled) {
                BatchTask.logAndThrowException(ex, this);
            }
        }
        finally {
            BatchTask.clearWriters(this.eventualOutputs);
        }
        if (!this.taskCanceled) {
            LOG.debug(this.getLogString("Finished data source operator"));
        } else {
            LOG.debug(this.getLogString("Data source operator cancelled"));
        }
    }

    @Override
    public void cancel() throws Exception {
        this.taskCanceled = true;
        LOG.debug(this.getLogString("Cancelling data source operator"));
    }

    private void initInputFormat() {
        ClassLoader userCodeClassLoader = this.getUserCodeClassLoader();
        Configuration taskConf = this.getTaskConfiguration();
        this.config = new TaskConfig(taskConf);
        try {
            this.format = (InputFormat)this.config.getStubWrapper(userCodeClassLoader).getUserCodeObject(InputFormat.class, userCodeClassLoader);
            if (!InputFormat.class.isAssignableFrom(this.format.getClass())) {
                throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" + InputFormat.class.getName() + "' as is required.");
            }
        }
        catch (ClassCastException ccex) {
            throw new RuntimeException("The stub class is not a proper subclass of " + InputFormat.class.getName(), ccex);
        }
        Thread thread = Thread.currentThread();
        ClassLoader original = thread.getContextClassLoader();
        try {
            thread.setContextClassLoader(userCodeClassLoader);
            this.format.configure(this.config.getStubParameters());
        }
        catch (Throwable t) {
            throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t);
        }
        finally {
            thread.setContextClassLoader(original);
        }
        this.serializerFactory = this.config.getOutputSerializer(userCodeClassLoader);
    }

    private void initOutputs(ClassLoader cl) throws Exception {
        this.chainedTasks = new ArrayList();
        this.eventualOutputs = new ArrayList();
        AccumulatorRegistry accumulatorRegistry = this.getEnvironment().getAccumulatorRegistry();
        AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
        this.output = BatchTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs, this.getExecutionConfig(), reporter, this.getEnvironment().getAccumulatorRegistry().getUserMap());
    }

    private String getLogString(String message) {
        return this.getLogString(message, this.getEnvironment().getTaskName());
    }

    private String getLogString(String message, String taskName) {
        return BatchTask.constructLogString(message, taskName, this);
    }

    private Iterator<InputSplit> getInputSplits() {
        final InputSplitProvider provider = this.getEnvironment().getInputSplitProvider();
        return new Iterator<InputSplit>(){
            private InputSplit nextSplit;
            private boolean exhausted;

            @Override
            public boolean hasNext() {
                if (this.exhausted) {
                    return false;
                }
                if (this.nextSplit != null) {
                    return true;
                }
                InputSplit split = provider.getNextInputSplit();
                if (split != null) {
                    this.nextSplit = split;
                    return true;
                }
                this.exhausted = true;
                return false;
            }

            @Override
            public InputSplit next() {
                if (this.nextSplit == null && !this.hasNext()) {
                    throw new NoSuchElementException();
                }
                InputSplit tmp = this.nextSplit;
                this.nextSplit = null;
                return tmp;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    public DistributedRuntimeUDFContext createRuntimeContext() {
        Environment env = this.getEnvironment();
        return new DistributedRuntimeUDFContext(env.getTaskName(), env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), this.getUserCodeClassLoader(), this.getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap());
    }
}

