/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.impl.util.Util;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

public final class ReadFilesP<T>
extends AbstractProcessor {
    private static final int DEFAULT_LOCAL_PARALLELISM = 4;
    private final Path directory;
    private final String glob;
    private final boolean sharedFileSystem;
    private final FunctionEx<? super Path, ? extends Stream<T>> readFileFn;
    private int processorIndex;
    private int parallelism;
    private DirectoryStream<Path> directoryStream;
    private Traverser<? extends T> outputTraverser;
    private Stream<T> currentStream;

    private ReadFilesP(@Nonnull String directory, @Nonnull String glob, boolean sharedFileSystem, @Nonnull FunctionEx<? super Path, ? extends Stream<T>> readFileFn) {
        this.directory = Paths.get(directory, new String[0]);
        this.glob = glob;
        this.readFileFn = readFileFn;
        this.sharedFileSystem = sharedFileSystem;
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        this.processorIndex = this.sharedFileSystem ? context.globalProcessorIndex() : context.localProcessorIndex();
        this.parallelism = this.sharedFileSystem ? context.totalParallelism() : context.localParallelism();
        this.directoryStream = Files.newDirectoryStream(this.directory, this.glob);
        this.outputTraverser = Traversers.traverseIterator(this.directoryStream.iterator()).filter(this::shouldProcessEvent).flatMap(this::processFile);
    }

    @Override
    public boolean complete() {
        return this.emitFromTraverser(this.outputTraverser);
    }

    private boolean shouldProcessEvent(Path file) {
        if (Files.isDirectory(file, new LinkOption[0])) {
            return false;
        }
        int hashCode = file.hashCode();
        return (hashCode & Integer.MAX_VALUE) % this.parallelism == this.processorIndex;
    }

    private Traverser<? extends T> processFile(Path file) {
        if (this.getLogger().isFinestEnabled()) {
            this.getLogger().finest("Processing file " + file);
        }
        assert (this.currentStream == null) : "currentStream != null";
        this.currentStream = this.readFileFn.apply(file);
        return Traversers.traverseStream(this.currentStream).onFirstNull(() -> {
            this.currentStream.close();
            this.currentStream = null;
        });
    }

    @Override
    public void close() throws IOException {
        IOException ex = null;
        if (this.directoryStream != null) {
            try {
                this.directoryStream.close();
            }
            catch (IOException e) {
                ex = e;
            }
        }
        if (this.currentStream != null) {
            this.currentStream.close();
        }
        if (ex != null) {
            throw ex;
        }
    }

    public static <T> ProcessorMetaSupplier metaSupplier(@Nonnull String directory, @Nonnull String glob, boolean sharedFileSystem, @Nonnull FunctionEx<? super Path, ? extends Stream<T>> readFileFn) {
        Util.checkSerializable(readFileFn, "readFileFn");
        return ProcessorMetaSupplier.of(4, () -> new ReadFilesP(directory, glob, sharedFileSystem, readFileFn));
    }
}

