/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;

@PublicEvolving
public class FileSourceFunction<OUT>
extends RichParallelSourceFunction<OUT> {
    private static final long serialVersionUID = 1L;
    private TypeInformation<OUT> typeInfo;
    private transient TypeSerializer<OUT> serializer;
    private InputFormat<OUT, InputSplit> format;
    private transient InputSplitProvider provider;
    private transient Iterator<InputSplit> splitIterator;
    private volatile boolean isRunning = true;

    public FileSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
        this.format = format;
        this.typeInfo = typeInfo;
    }

    public void open(Configuration parameters) throws Exception {
        StreamingRuntimeContext context = (StreamingRuntimeContext)this.getRuntimeContext();
        this.provider = context.getInputSplitProvider();
        this.format.configure(parameters);
        this.serializer = this.typeInfo.createSerializer(this.getRuntimeContext().getExecutionConfig());
        this.splitIterator = this.getInputSplits();
        if (this.splitIterator.hasNext()) {
            this.format.open(this.splitIterator.next());
        }
        this.isRunning = true;
    }

    public void close() throws Exception {
        this.format.close();
    }

    private Iterator<InputSplit> getInputSplits() {
        return new Iterator<InputSplit>(){
            private InputSplit nextSplit;
            private boolean exhausted;

            @Override
            public boolean hasNext() {
                if (this.exhausted) {
                    return false;
                }
                if (this.nextSplit != null) {
                    return true;
                }
                InputSplit split = FileSourceFunction.this.provider.getNextInputSplit();
                if (split != null) {
                    this.nextSplit = split;
                    return true;
                }
                this.exhausted = true;
                return false;
            }

            @Override
            public InputSplit next() {
                if (this.nextSplit == null && !this.hasNext()) {
                    throw new NoSuchElementException();
                }
                InputSplit tmp = this.nextSplit;
                this.nextSplit = null;
                return tmp;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    @Override
    public void run(SourceFunction.SourceContext<OUT> ctx) throws Exception {
        while (this.isRunning) {
            Object nextElement = this.serializer.createInstance();
            if ((nextElement = this.format.nextRecord(nextElement)) == null && this.splitIterator.hasNext()) {
                this.format.open(this.splitIterator.next());
                continue;
            }
            if (nextElement == null) break;
            ctx.collect(nextElement);
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    public InputFormat<OUT, InputSplit> getFormat() {
        return this.format;
    }
}

