package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.sun.nio.file.SensitivityWatchEventModifier;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/connector/ReadFileStreamP.class */
public class ReadFileStreamP extends AbstractProcessor implements Closeable {
    private final WatchType watchType;
    private final int parallelism;
    private final int id;
    private final Path path;
    private final Map<String, Long> fileOffsets = new HashMap();
    private WatchService watcher;

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/ReadFileStreamP$Supplier.class */
    private static class Supplier implements ProcessorSupplier {
        static final long serialVersionUID = 1;
        private final String folderPath;
        private final WatchType watchType;
        private transient ArrayList<ReadFileStreamP> readers;

        Supplier(String str, WatchType watchType) {
            this.folderPath = str;
            this.watchType = watchType;
        }

        @Override // com.hazelcast.jet.ProcessorSupplier
        @Nonnull
        public List<ReadFileStreamP> get(int i) {
            this.readers = new ArrayList<>();
            for (int i2 = 0; i2 < i; i2++) {
                this.readers.add(new ReadFileStreamP(this.folderPath, this.watchType, i, i2));
            }
            return this.readers;
        }

        @Override // com.hazelcast.jet.ProcessorSupplier
        public void complete(Throwable th) {
            this.readers.forEach(readFileStreamP -> {
                readFileStreamP.getClass();
                Util.uncheckRun(readFileStreamP::close);
            });
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/ReadFileStreamP$WatchType.class */
    public enum WatchType {
        NEW,
        REPROCESS,
        APPENDED_ONLY
    }

    ReadFileStreamP(String str, WatchType watchType, int i, int i2) {
        this.watchType = watchType;
        this.parallelism = i;
        this.id = i2;
        this.path = Paths.get(str, new String[0]);
    }

    @Override // com.hazelcast.jet.AbstractProcessor
    protected void init(@Nonnull Processor.Context context) throws Exception {
        this.watcher = FileSystems.getDefault().newWatchService();
        this.path.register(this.watcher, new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE}, SensitivityWatchEventModifier.HIGH);
        getLogger().info("Started to watch the directory: " + this.path);
    }

    @Override // com.hazelcast.jet.Processor
    public boolean complete() {
        try {
            return tryComplete();
        } catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        } catch (InterruptedException e2) {
            return true;
        }
    }

    private boolean tryComplete() throws InterruptedException, IOException {
        WatchKey take = this.watcher.take();
        for (WatchEvent<?> watchEvent : take.pollEvents()) {
            WatchEvent.Kind<?> kind = watchEvent.kind();
            Path resolve = this.path.resolve((Path) watchEvent.context());
            if (kind == StandardWatchEventKinds.ENTRY_DELETE && resolve.equals(this.path)) {
                getLogger().info("Directory " + this.path + " deleted, stopped watching");
                return true;
            }
            if (shouldProcessEvent(resolve)) {
                processEvent(kind, resolve);
            }
        }
        if (take.reset()) {
            return false;
        }
        getLogger().info("Watch key is invalid. Stopping watcher.");
        return true;
    }

    private void processEvent(WatchEvent.Kind<?> kind, Path path) throws IOException {
        if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
            getLogger().info("Deteceted deleted file: " + path);
            this.fileOffsets.remove(path.toString());
        } else if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
            getLogger().info("Detected modified file: " + path);
            long processFile = processFile(path, this.watchType == WatchType.REPROCESS ? 0L : this.fileOffsets.computeIfAbsent(path.toString(), str -> {
                return 0L;
            }).longValue());
            if (this.watchType == WatchType.APPENDED_ONLY) {
                this.fileOffsets.put(path.toString(), Long.valueOf(processFile));
            }
        }
    }

    private boolean shouldProcessEvent(Path path) {
        return (path.toFile().getPath().hashCode() & Integer.MAX_VALUE) % this.parallelism == this.id;
    }

    private long processFile(Path path, long j) throws IOException {
        FileInputStream fileInputStream = new FileInputStream(path.toFile());
        Throwable th = null;
        try {
            fileInputStream.getChannel().position(j);
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileInputStream, "UTF-8"));
            Throwable th2 = null;
            while (true) {
                try {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    getLogger().finest("line = " + readLine);
                    emit(readLine);
                } catch (Throwable th3) {
                    if (bufferedReader != null) {
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th4) {
                                th2.addSuppressed(th4);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    throw th3;
                }
            }
            long position = fileInputStream.getChannel().position();
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th5) {
                        th2.addSuppressed(th5);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            return position;
        } finally {
            if (fileInputStream != null) {
                if (0 != 0) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    fileInputStream.close();
                }
            }
        }
    }

    @Override // com.hazelcast.jet.Processor
    public boolean isCooperative() {
        return false;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.watcher != null) {
            getLogger().info("Closing watcher");
            this.watcher.close();
        }
    }

    public static ProcessorSupplier supplier(String str, WatchType watchType) {
        return new Supplier(str, watchType);
    }
}
