/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.UserCodeClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSourceTask<OT>
extends AbstractInvokable {
    private static final Logger LOG = LoggerFactory.getLogger(DataSourceTask.class);
    private List<RecordWriter<?>> eventualOutputs;
    private Collector<OT> output;
    private InputFormat<OT, InputSplit> format;
    private TypeSerializerFactory<OT> serializerFactory;
    private TaskConfig config;
    private ArrayList<ChainedDriver<?, ?>> chainedTasks;
    private volatile boolean taskCanceled = false;
    private final CompletableFuture<Void> terminationFuture = new CompletableFuture();

    public DataSourceTask(Environment environment) {
        super(environment);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void invoke() throws Exception {
        SimpleCounter tmpNumRecordsOut;
        this.initInputFormat();
        LOG.debug(this.getLogString("Start registering input and output"));
        try {
            this.initOutputs(this.getEnvironment().getUserCodeClassLoader());
        }
        catch (Exception ex) {
            throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " + ex.getMessage(), ex);
        }
        LOG.debug(this.getLogString("Finished registering input and output"));
        LOG.debug(this.getLogString("Starting data source operator"));
        DistributedRuntimeUDFContext ctx = this.createRuntimeContext();
        try {
            OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup)ctx.getMetricGroup()).getIOMetricGroup();
            ioMetricGroup.reuseInputMetricsForTask();
            if (this.config.getNumberOfChainedStubs() == 0) {
                ioMetricGroup.reuseOutputMetricsForTask();
            }
            tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
        }
        catch (Exception e) {
            LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
            tmpNumRecordsOut = new SimpleCounter();
        }
        SimpleCounter numRecordsOut = tmpNumRecordsOut;
        Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
        if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
            ((RichInputFormat)this.format).setRuntimeContext((RuntimeContext)ctx);
            LOG.debug(this.getLogString("Rich Source detected. Initializing runtime context."));
            ((RichInputFormat)this.format).openInputFormat();
            LOG.debug(this.getLogString("Rich Source detected. Opening the InputFormat."));
        }
        ExecutionConfig executionConfig = this.getExecutionConfig();
        boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
        LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
        TypeSerializer serializer = this.serializerFactory.getSerializer();
        try {
            BatchTask.openChainedTasks(this.chainedTasks, this);
            Iterator<InputSplit> splitIterator = this.getInputSplits();
            while (!this.taskCanceled && splitIterator.hasNext()) {
                InputSplit split = splitIterator.next();
                LOG.debug(this.getLogString("Opening input split " + split.toString()));
                InputFormat<OT, InputSplit> format = this.format;
                format.open(split);
                LOG.debug(this.getLogString("Starting to read input from split " + split.toString()));
                try {
                    CountingCollector<OT> output = new CountingCollector<OT>(this.output, (Counter)numRecordsOut);
                    if (objectReuseEnabled) {
                        Object reuse = serializer.createInstance();
                        while (!this.taskCanceled && !format.reachedEnd()) {
                            Object returned = format.nextRecord(reuse);
                            if (returned == null) continue;
                            output.collect(returned);
                        }
                    } else {
                        while (!this.taskCanceled && !format.reachedEnd()) {
                            Object returned = format.nextRecord(serializer.createInstance());
                            if (returned == null) continue;
                            output.collect(returned);
                        }
                    }
                    if (LOG.isDebugEnabled() && !this.taskCanceled) {
                        LOG.debug(this.getLogString("Closing input split " + split.toString()));
                    }
                }
                finally {
                    format.close();
                }
                completedSplitsCounter.inc();
            }
            BatchTask.closeChainedTasks(this.chainedTasks, this);
            this.output.close();
        }
        catch (Exception ex) {
            try {
                this.format.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            BatchTask.cancelChainedTasks(this.chainedTasks);
            ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
            if (ex instanceof CancelTaskException) {
                throw ex;
            }
            if (!this.taskCanceled) {
                BatchTask.logAndThrowException(ex, this);
            }
        }
        finally {
            BatchTask.clearWriters(this.eventualOutputs);
            if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
                ((RichInputFormat)this.format).closeInputFormat();
                LOG.debug(this.getLogString("Rich Source detected. Closing the InputFormat."));
            }
            this.terminationFuture.complete(null);
        }
        if (!this.taskCanceled) {
            LOG.debug(this.getLogString("Finished data source operator"));
        } else {
            LOG.debug(this.getLogString("Data source operator cancelled"));
        }
    }

    @Override
    public Future<Void> cancel() throws Exception {
        this.taskCanceled = true;
        LOG.debug(this.getLogString("Cancelling data source operator"));
        return this.terminationFuture;
    }

    private void initInputFormat() {
        Pair operatorIdAndInputFormat;
        ClassLoader userCodeClassLoader = this.getUserCodeClassLoader();
        Configuration taskConf = this.getTaskConfiguration();
        this.config = new TaskConfig(taskConf);
        InputOutputFormatContainer formatContainer = new InputOutputFormatContainer(this.config, userCodeClassLoader);
        try {
            operatorIdAndInputFormat = formatContainer.getUniqueInputFormat();
            this.format = (InputFormat)operatorIdAndInputFormat.getValue();
            if (!InputFormat.class.isAssignableFrom(this.format.getClass())) {
                throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" + InputFormat.class.getName() + "' as is required.");
            }
        }
        catch (ClassCastException ccex) {
            throw new RuntimeException("The stub class is not a proper subclass of " + InputFormat.class.getName(), ccex);
        }
        Thread thread = Thread.currentThread();
        ClassLoader original = thread.getContextClassLoader();
        try {
            thread.setContextClassLoader(userCodeClassLoader);
            this.format.configure(formatContainer.getParameters((OperatorID)((Object)operatorIdAndInputFormat.getKey())));
        }
        catch (Throwable t) {
            throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t);
        }
        finally {
            thread.setContextClassLoader(original);
        }
        this.serializerFactory = this.config.getOutputSerializer(userCodeClassLoader);
    }

    private void initOutputs(UserCodeClassLoader cl) throws Exception {
        this.chainedTasks = new ArrayList();
        this.eventualOutputs = new ArrayList();
        this.output = BatchTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs, this.getExecutionConfig(), this.getEnvironment().getAccumulatorRegistry().getUserMap());
    }

    private String getLogString(String message) {
        return this.getLogString(message, this.getEnvironment().getTaskInfo().getTaskName());
    }

    private String getLogString(String message, String taskName) {
        return BatchTask.constructLogString(message, taskName, this);
    }

    private Iterator<InputSplit> getInputSplits() {
        final InputSplitProvider provider = this.getEnvironment().getInputSplitProvider();
        return new Iterator<InputSplit>(){
            private InputSplit nextSplit;
            private boolean exhausted;

            @Override
            public boolean hasNext() {
                InputSplit split;
                if (this.exhausted) {
                    return false;
                }
                if (this.nextSplit != null) {
                    return true;
                }
                try {
                    split = provider.getNextInputSplit(DataSourceTask.this.getUserCodeClassLoader());
                }
                catch (InputSplitProviderException e) {
                    throw new RuntimeException("Could not retrieve next input split.", e);
                }
                if (split != null) {
                    this.nextSplit = split;
                    return true;
                }
                this.exhausted = true;
                return false;
            }

            @Override
            public InputSplit next() {
                if (this.nextSplit == null && !this.hasNext()) {
                    throw new NoSuchElementException();
                }
                InputSplit tmp = this.nextSplit;
                this.nextSplit = null;
                return tmp;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    public DistributedRuntimeUDFContext createRuntimeContext() {
        Environment env = this.getEnvironment();
        String sourceName = this.getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
        sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
        return new DistributedRuntimeUDFContext(env.getTaskInfo(), env.getUserCodeClassLoader(), this.getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), this.getEnvironment().getMetricGroup().getOrAddOperator(sourceName), env.getExternalResourceInfoProvider());
    }
}

