package com.hazelcast.jet.impl.connector;

import com.hazelcast.config.WanBatchPublisherConfig;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.ReflectionUtils;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.security.impl.function.SecuredFunctions;
import com.hazelcast.security.permission.ActionConstants;
import com.hazelcast.security.permission.ConnectorPermission;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamFilesP.class */
public class StreamFilesP<R> extends AbstractProcessor {
    private static final int LINES_IN_ONE_BATCH = 64;
    private static final String SENSITIVITY_MODIFIER_CLASS_NAME = "com.sun.nio.file.SensitivityWatchEventModifier";
    private static final WatchEvent.Kind[] WATCH_EVENT_KINDS = {StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE};
    private static final WatchEvent.Modifier[] WATCH_EVENT_MODIFIERS = getHighSensitivityModifiers();
    private final Path watchedDirectory;
    private final Charset charset;
    private final PathMatcher glob;
    private final boolean sharedFileSystem;
    private final BiFunctionEx<? super String, ? super String, ? extends R> mapOutputFn;
    private WatchService watcher;
    private R pendingItem;
    private Path currentFile;
    private String currentFileName;
    private FileInputStream currentInputStream;
    private Reader currentReader;
    private int parallelism;
    private int processorIndex;
    final Map<Path, FileOffset> fileOffsets = new HashMap();
    private final Queue<Path> eventQueue = new ArrayDeque();
    private StringBuilder lineBuilder = new StringBuilder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamFilesP$FileOffset.class */
    public static final class FileOffset {
        private static final FileOffset ZERO = new FileOffset(0, WanBatchPublisherConfig.DEFAULT_TARGET_ENDPOINTS);
        private final long offset;
        private final String pendingLine;

        private FileOffset(long j, @Nonnull String str) {
            this.offset = j;
            this.pendingLine = str;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long positiveOffset() {
            return this.offset >= 0 ? this.offset : (-this.offset) - 1;
        }

        public String toString() {
            return "FileOffset{offset=" + this.offset + ", pendingLine='" + this.pendingLine + "'}";
        }
    }

    public StreamFilesP(@Nonnull String str, @Nonnull Charset charset, @Nonnull String str2, boolean z, @Nonnull BiFunctionEx<? super String, ? super String, ? extends R> biFunctionEx) {
        this.watchedDirectory = Paths.get(str, new String[0]);
        this.charset = charset;
        this.glob = FileSystems.getDefault().getPathMatcher("glob:" + str2);
        this.sharedFileSystem = z;
        this.mapOutputFn = biFunctionEx;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean isCooperative() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.jet.core.AbstractProcessor
    public void init(@Nonnull Processor.Context context) throws Exception {
        this.processorIndex = this.sharedFileSystem ? context.globalProcessorIndex() : context.localProcessorIndex();
        this.parallelism = this.sharedFileSystem ? context.totalParallelism() : context.localParallelism();
        DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(this.watchedDirectory);
        Throwable th = null;
        try {
            try {
                for (Path path : newDirectoryStream) {
                    if (Files.isRegularFile(path, new LinkOption[0])) {
                        this.fileOffsets.put(path, new FileOffset(-Files.size(path), WanBatchPublisherConfig.DEFAULT_TARGET_ENDPOINTS));
                    }
                }
                if (newDirectoryStream != null) {
                    if (0 != 0) {
                        try {
                            newDirectoryStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newDirectoryStream.close();
                    }
                }
                this.watcher = FileSystems.getDefault().newWatchService();
                this.watchedDirectory.register(this.watcher, WATCH_EVENT_KINDS, WATCH_EVENT_MODIFIERS);
                getLogger().info("Started to watch directory: " + this.watchedDirectory);
            } finally {
            }
        } catch (Throwable th3) {
            if (newDirectoryStream != null) {
                if (th != null) {
                    try {
                        newDirectoryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newDirectoryStream.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public void close() {
        try {
            closeCurrentFile();
            getLogger().fine("Closing StreamFilesP");
            this.watcher.close();
        } catch (IOException e) {
            getLogger().severe("Failed to close StreamFilesP", e);
        } finally {
            this.watcher = null;
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        if (!drainWatcherEvents()) {
            return true;
        }
        if (this.currentFile == null) {
            this.currentFile = this.eventQueue.poll();
            this.currentFileName = this.currentFile != null ? String.valueOf(this.currentFile.getFileName()) : null;
        }
        if (this.currentFile == null) {
            return false;
        }
        processFile();
        return false;
    }

    private boolean drainWatcherEvents() {
        ILogger logger = getLogger();
        try {
            WatchKey poll = (this.currentFile == null && this.eventQueue.isEmpty()) ? this.watcher.poll(1L, TimeUnit.SECONDS) : this.watcher.poll();
            if (poll == null) {
                if (Files.exists(this.watchedDirectory, new LinkOption[0])) {
                    return true;
                }
                logger.info("Directory " + this.watchedDirectory + " does not exist, stopped watching");
                return false;
            }
            for (WatchEvent<?> watchEvent : poll.pollEvents()) {
                WatchEvent.Kind<?> kind = watchEvent.kind();
                Path path = (Path) watchEvent.context();
                Path resolve = this.watchedDirectory.resolve(path);
                if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
                    if (this.glob.matches(path) && belongsToThisProcessor(path) && !Files.isDirectory(resolve, new LinkOption[0])) {
                        LoggingUtil.logFine(logger, "Will open file to read new content: %s", resolve);
                        this.eventQueue.add(resolve);
                    }
                } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
                    LoggingUtil.logFinest(logger, "File was deleted: %s", resolve);
                    this.fileOffsets.remove(resolve);
                } else {
                    if (kind != StandardWatchEventKinds.OVERFLOW) {
                        throw new JetException("Unknown kind of WatchEvent: " + kind);
                    }
                    logger.warning("Detected OVERFLOW in " + this.watchedDirectory);
                }
            }
            if (poll.reset()) {
                return true;
            }
            logger.info("Watch key is invalid. Stopping watcher.");
            return false;
        } catch (InterruptedException e) {
            return false;
        }
    }

    private boolean belongsToThisProcessor(Path path) {
        return (path.hashCode() & Integer.MAX_VALUE) % this.parallelism == this.processorIndex;
    }

    /* JADX WARN: Code restructure failed: missing block: B:25:0x0041, code lost:
    
        r9.fileOffsets.put(r9.currentFile, new com.hazelcast.jet.impl.connector.StreamFilesP.FileOffset(r9.currentInputStream.getChannel().position(), r9.lineBuilder.toString(), null));
        r9.lineBuilder.setLength(0);
        closeCurrentFile();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void processFile() {
        /*
            r9 = this;
            r0 = r9
            boolean r0 = r0.ensureFileOpen()     // Catch: java.io.IOException -> L90
            if (r0 != 0) goto L8
            return
        L8:
            r0 = 0
            r10 = r0
        La:
            r0 = r10
            r1 = 64
            if (r0 >= r1) goto L8d
            r0 = r9
            R r0 = r0.pendingItem     // Catch: java.io.IOException -> L90
            if (r0 != 0) goto L3a
            r0 = r9
            r1 = r9
            java.io.Reader r1 = r1.currentReader     // Catch: java.io.IOException -> L90
            java.lang.String r0 = r0.readCompleteLine(r1)     // Catch: java.io.IOException -> L90
            r11 = r0
            r0 = r9
            r1 = r11
            if (r1 == 0) goto L36
            r1 = r9
            com.hazelcast.function.BiFunctionEx<? super java.lang.String, ? super java.lang.String, ? extends R> r1 = r1.mapOutputFn     // Catch: java.io.IOException -> L90
            r2 = r9
            java.lang.String r2 = r2.currentFileName     // Catch: java.io.IOException -> L90
            r3 = r11
            java.lang.Object r1 = r1.apply(r2, r3)     // Catch: java.io.IOException -> L90
            goto L37
        L36:
            r1 = 0
        L37:
            r0.pendingItem = r1     // Catch: java.io.IOException -> L90
        L3a:
            r0 = r9
            R r0 = r0.pendingItem     // Catch: java.io.IOException -> L90
            if (r0 != 0) goto L77
            r0 = r9
            java.util.Map<java.nio.file.Path, com.hazelcast.jet.impl.connector.StreamFilesP$FileOffset> r0 = r0.fileOffsets     // Catch: java.io.IOException -> L90
            r1 = r9
            java.nio.file.Path r1 = r1.currentFile     // Catch: java.io.IOException -> L90
            com.hazelcast.jet.impl.connector.StreamFilesP$FileOffset r2 = new com.hazelcast.jet.impl.connector.StreamFilesP$FileOffset     // Catch: java.io.IOException -> L90
            r3 = r2
            r4 = r9
            java.io.FileInputStream r4 = r4.currentInputStream     // Catch: java.io.IOException -> L90
            java.nio.channels.FileChannel r4 = r4.getChannel()     // Catch: java.io.IOException -> L90
            long r4 = r4.position()     // Catch: java.io.IOException -> L90
            r5 = r9
            java.lang.StringBuilder r5 = r5.lineBuilder     // Catch: java.io.IOException -> L90
            java.lang.String r5 = r5.toString()     // Catch: java.io.IOException -> L90
            r6 = 0
            r3.<init>(r4, r5)     // Catch: java.io.IOException -> L90
            java.lang.Object r0 = r0.put(r1, r2)     // Catch: java.io.IOException -> L90
            r0 = r9
            java.lang.StringBuilder r0 = r0.lineBuilder     // Catch: java.io.IOException -> L90
            r1 = 0
            r0.setLength(r1)     // Catch: java.io.IOException -> L90
            r0 = r9
            r0.closeCurrentFile()     // Catch: java.io.IOException -> L90
            goto L8d
        L77:
            r0 = r9
            r1 = r9
            R r1 = r1.pendingItem     // Catch: java.io.IOException -> L90
            boolean r0 = r0.tryEmit(r1)     // Catch: java.io.IOException -> L90
            if (r0 == 0) goto L8d
            r0 = r9
            r1 = 0
            r0.pendingItem = r1     // Catch: java.io.IOException -> L90
            int r10 = r10 + 1
            goto La
        L8d:
            goto L96
        L90:
            r10 = move-exception
            r0 = r10
            java.lang.RuntimeException r0 = com.hazelcast.jet.impl.util.ExceptionUtil.sneakyThrow(r0)
            throw r0
        L96:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.hazelcast.jet.impl.connector.StreamFilesP.processFile():void");
    }

    private boolean ensureFileOpen() throws IOException {
        if (this.currentReader != null) {
            return true;
        }
        FileOffset orDefault = this.fileOffsets.getOrDefault(this.currentFile, FileOffset.ZERO);
        LoggingUtil.logFine(getLogger(), "Processing file %s, previous offset: %s", this.currentFile, orDefault);
        try {
            FileInputStream fileInputStream = new FileInputStream(this.currentFile.toFile());
            fileInputStream.getChannel().position(orDefault.positiveOffset());
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileInputStream, this.charset));
            if (orDefault.offset < 0 && !findEndOfLine(bufferedReader)) {
                closeCurrentFile();
                return false;
            }
            this.currentReader = bufferedReader;
            this.currentInputStream = fileInputStream;
            this.lineBuilder.append(orDefault.pendingLine);
            return true;
        } catch (FileNotFoundException e) {
            closeCurrentFile();
            return false;
        }
    }

    private static boolean findEndOfLine(Reader reader) throws IOException {
        int read;
        do {
            read = reader.read();
            if (read >= 0) {
                if (read == 10) {
                    break;
                }
            } else {
                return false;
            }
        } while (read != 13);
        maybeSkipLF(reader, read);
        return true;
    }

    String readCompleteLine(Reader reader) throws IOException {
        int read;
        while (true) {
            read = reader.read();
            if (read < 0) {
                return null;
            }
            if (read == 13 || read == 10) {
                break;
            }
            this.lineBuilder.append((char) read);
        }
        maybeSkipLF(reader, read);
        try {
            String sb = this.lineBuilder.toString();
            this.lineBuilder.setLength(0);
            return sb;
        } catch (Throwable th) {
            this.lineBuilder.setLength(0);
            throw th;
        }
    }

    private static void maybeSkipLF(Reader reader, int i) throws IOException {
        if (i == 13) {
            reader.mark(1);
            if (reader.read() != 10) {
                reader.reset();
            }
        }
    }

    private void closeCurrentFile() {
        if (this.currentReader != null) {
            try {
                this.currentReader.close();
            } catch (IOException e) {
                throw ExceptionUtil.sneakyThrow(e);
            }
        }
        this.currentFile = null;
        this.currentFileName = null;
        this.currentReader = null;
        this.currentInputStream = null;
    }

    @Nonnull
    public static ProcessorMetaSupplier metaSupplier(@Nonnull String str, @Nonnull String str2, @Nonnull String str3, boolean z, @Nonnull BiFunctionEx<? super String, ? super String, ?> biFunctionEx) {
        Util.checkSerializable(biFunctionEx, "mapOutputFn");
        return ProcessorMetaSupplier.of(2, ConnectorPermission.file(str, ActionConstants.ACTION_READ), ProcessorSupplier.of(SecuredFunctions.streamFileProcessorFn(str, str2, str3, z, biFunctionEx)));
    }

    private static WatchEvent.Modifier[] getHighSensitivityModifiers() {
        Object readStaticFieldOrNull = ReflectionUtils.readStaticFieldOrNull(SENSITIVITY_MODIFIER_CLASS_NAME, "HIGH");
        return readStaticFieldOrNull instanceof WatchEvent.Modifier ? new WatchEvent.Modifier[]{(WatchEvent.Modifier) readStaticFieldOrNull} : new WatchEvent.Modifier[0];
    }
}
