/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.cluster.Address;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.file.impl.FileProcessorMetaSupplier;
import com.hazelcast.jet.pipeline.file.impl.FileTraverser;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

public final class ReadFilesP<T>
extends AbstractProcessor {
    private static final int DEFAULT_LOCAL_PARALLELISM = 4;
    private final String directory;
    private final String glob;
    private final boolean sharedFileSystem;
    private final boolean ignoreFileNotFound;
    private final FunctionEx<? super Path, ? extends Stream<T>> readFileFn;
    private LocalFileTraverser<T> traverser;

    private ReadFilesP(@Nonnull String directory, @Nonnull String glob, boolean sharedFileSystem, boolean ignoreFileNotFound, @Nonnull FunctionEx<? super Path, ? extends Stream<T>> readFileFn) {
        this.directory = directory;
        this.glob = glob;
        this.sharedFileSystem = sharedFileSystem;
        this.ignoreFileNotFound = ignoreFileNotFound;
        this.readFileFn = readFileFn;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) {
        ILogger logger = context.logger();
        int processorIndex = this.sharedFileSystem ? context.globalProcessorIndex() : context.localProcessorIndex();
        int parallelism = this.sharedFileSystem ? context.totalParallelism() : context.localParallelism();
        this.traverser = new LocalFileTraverser(logger, this.directory, this.glob, this.ignoreFileNotFound, path -> ReadFilesP.shouldProcessEvent(path, parallelism, processorIndex), this.readFileFn);
    }

    private static boolean shouldProcessEvent(Path path, int parallelism, int processorIndex) {
        int hashCode = path.hashCode();
        return (hashCode & Integer.MAX_VALUE) % parallelism == processorIndex;
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    @Override
    public boolean complete() {
        return this.emitFromTraverser(this.traverser);
    }

    @Override
    public void close() throws IOException {
        if (this.traverser != null) {
            this.traverser.close();
        }
    }

    public static <T> ProcessorMetaSupplier metaSupplier(@Nonnull String directory, @Nonnull String glob, boolean sharedFileSystem, boolean ignoreFileNotFound, @Nonnull FunctionEx<? super Path, ? extends Stream<T>> readFileFn) {
        Util.checkSerializable(readFileFn, "readFileFn");
        return new MetaSupplier(4, directory, glob, sharedFileSystem, ignoreFileNotFound, readFileFn);
    }

    private static final class LocalFileTraverser<T>
    implements FileTraverser<T> {
        private final ILogger logger;
        private final Path directory;
        private final String glob;
        private final boolean ignoreFileNotFound;
        private final FunctionEx<? super Path, ? extends Stream<T>> readFileFn;
        private final Traverser<T> delegate;
        private DirectoryStream<Path> directoryStream;
        private Stream<T> fileStream;
        private boolean hasResults;

        private LocalFileTraverser(ILogger logger, String directory, String glob, boolean ignoreFileNotFound, Predicate<Path> pathFilterFn, FunctionEx<? super Path, ? extends Stream<T>> readFileFn) {
            this.logger = logger;
            this.directory = Paths.get(directory, new String[0]);
            this.glob = glob;
            this.ignoreFileNotFound = ignoreFileNotFound;
            this.readFileFn = readFileFn;
            this.delegate = Traversers.traverseIterator(Util.uncheckCall(this::paths)).filter(path -> !Files.isDirectory(path, new LinkOption[0])).peek(path -> {
                this.hasResults = true;
            }).filter(path -> pathFilterFn.test((Path)path)).flatMap(this::processFile);
        }

        private Iterator<Path> paths() throws IOException {
            File file = this.directory.toFile();
            if (!file.exists()) {
                throw new JetException("The directory '" + this.directory + "' does not exist.");
            }
            if (!file.isDirectory()) {
                throw new JetException("The given path (" + this.directory + ") must point to a directory, not a file.");
            }
            this.directoryStream = Files.newDirectoryStream(this.directory, this.glob);
            return this.directoryStream.iterator();
        }

        private Traverser<T> processFile(Path file) {
            this.logger.finest("Processing file " + file);
            assert (this.fileStream == null) : "fileStream != null";
            this.fileStream = this.readFileFn.apply(file);
            return Traversers.traverseStream(this.fileStream).onFirstNull(() -> {
                this.fileStream.close();
                this.fileStream = null;
            });
        }

        @Override
        public T next() {
            T next = this.delegate.next();
            if (next == null && !this.hasResults && !this.ignoreFileNotFound) {
                throw new JetException("The glob " + this.glob + " matches no files in directory " + this.directory);
            }
            return next;
        }

        @Override
        public void close() throws IOException {
            IOException exception = null;
            if (this.directoryStream != null) {
                try {
                    this.directoryStream.close();
                }
                catch (IOException ioe) {
                    exception = ioe;
                }
            }
            if (this.fileStream != null) {
                this.fileStream.close();
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

    private static final class MetaSupplier<T>
    implements FileProcessorMetaSupplier<T> {
        private static final ILogger LOGGER = Logger.getLogger(MetaSupplier.class);
        private final int localParallelism;
        private final String directory;
        private final String glob;
        private final boolean sharedFileSystem;
        private final boolean ignoreFileNotFound;
        private final FunctionEx<? super Path, ? extends Stream<T>> readFileFn;

        private MetaSupplier(int localParallelism, String directory, String glob, boolean sharedFileSystem, boolean ignoreFileNotFound, FunctionEx<? super Path, ? extends Stream<T>> readFileFn) {
            this.localParallelism = localParallelism;
            this.directory = directory;
            this.glob = glob;
            this.sharedFileSystem = sharedFileSystem;
            this.ignoreFileNotFound = ignoreFileNotFound;
            this.readFileFn = readFileFn;
        }

        @Override
        @Nonnull
        public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> addresses) {
            return address -> ProcessorSupplier.of(() -> new ReadFilesP(this.directory, this.glob, this.sharedFileSystem, this.ignoreFileNotFound, this.readFileFn));
        }

        @Override
        public int preferredLocalParallelism() {
            return this.localParallelism;
        }

        @Override
        public FileTraverser<T> traverser() {
            return new LocalFileTraverser(LOGGER, this.directory, this.glob, this.ignoreFileNotFound, path -> true, this.readFileFn);
        }
    }
}

