/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.provenance.store;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.store.EventStorePartition;
import org.apache.nifi.provenance.store.RecordReaderFactory;
import org.apache.nifi.provenance.store.RecordWriterFactory;
import org.apache.nifi.provenance.store.RecordWriterLease;
import org.apache.nifi.provenance.store.StorageResult;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import org.apache.nifi.provenance.store.iterator.SelectiveRecordReaderEventIterator;
import org.apache.nifi.provenance.store.iterator.SequentialRecordReaderEventIterator;
import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.apache.nifi.provenance.util.NamedThreadFactory;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WriteAheadStorePartition
implements EventStorePartition {
    private static final Logger logger = LoggerFactory.getLogger(WriteAheadStorePartition.class);
    private final RepositoryConfiguration config;
    private final File partitionDirectory;
    private final String partitionName;
    private final RecordWriterFactory recordWriterFactory;
    private final RecordReaderFactory recordReaderFactory;
    private final BlockingQueue<File> filesToCompress;
    private final AtomicLong idGenerator;
    private final AtomicLong maxEventId = new AtomicLong(-1L);
    private volatile boolean closed = false;
    private AtomicReference<RecordWriterLease> eventWriterLeaseRef = new AtomicReference();
    private final SortedMap<Long, File> minEventIdToPathMap = new TreeMap<Long, File>();

    public WriteAheadStorePartition(File storageDirectory, String partitionName, RepositoryConfiguration repoConfig, RecordWriterFactory recordWriterFactory, RecordReaderFactory recordReaderFactory, BlockingQueue<File> filesToCompress, AtomicLong idGenerator, EventReporter eventReporter) {
        this.partitionName = partitionName;
        this.config = repoConfig;
        this.idGenerator = idGenerator;
        this.partitionDirectory = storageDirectory;
        this.recordWriterFactory = recordWriterFactory;
        this.recordReaderFactory = recordReaderFactory;
        this.filesToCompress = filesToCompress;
    }

    @Override
    public void close() throws IOException {
        this.closed = true;
        RecordWriterLease lease = this.eventWriterLeaseRef.get();
        if (lease != null) {
            lease.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void initialize() throws IOException {
        File[] uncompressedFiles;
        File[] files;
        if (!this.partitionDirectory.exists()) {
            Files.createDirectories(this.partitionDirectory.toPath(), new FileAttribute[0]);
        }
        if ((files = this.partitionDirectory.listFiles(DirectoryUtils.EVENT_FILE_FILTER)) == null) {
            throw new IOException("Could not access files in the " + this.partitionDirectory + " directory");
        }
        long maxEventId = -1L;
        List<File> fileList = Arrays.asList(files);
        Collections.sort(fileList, DirectoryUtils.LARGEST_ID_FIRST);
        for (File file : fileList) {
            try {
                RecordReader reader = this.recordReaderFactory.newRecordReader(file, Collections.emptyList(), Integer.MAX_VALUE);
                long eventId = reader.getMaxEventId();
                if (eventId <= maxEventId) continue;
                maxEventId = eventId;
                break;
            }
            catch (Exception e) {
                logger.warn("Could not read file {}; if this file contains Provenance Events, new events may be created with the same event identifiers", (Object)file, (Object)e);
            }
        }
        SortedMap<Long, File> sortedMap = this.minEventIdToPathMap;
        synchronized (sortedMap) {
            for (File file : fileList) {
                long minEventId = DirectoryUtils.getMinId(file);
                this.minEventIdToPathMap.put(minEventId, file);
            }
        }
        this.maxEventId.set(maxEventId);
        if (this.config.isCompressOnRollover() && (uncompressedFiles = this.partitionDirectory.listFiles(f -> f.getName().endsWith(".prov"))) != null) {
            for (File file : uncompressedFiles) {
                File compressed = new File(file.getParentFile(), file.getName() + ".gz");
                if (!compressed.exists()) continue;
                compressed.delete();
            }
        }
        long nextPartitionId = maxEventId + 1L;
        long updatedId = this.idGenerator.updateAndGet(curVal -> Math.max(curVal, nextPartitionId));
        logger.info("After recovering {}, next Event ID to be generated will be {}", (Object)this.partitionDirectory, (Object)updatedId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public StorageResult addEvents(Iterable<ProvenanceEventRecord> events) throws IOException {
        Map<ProvenanceEventRecord, StorageSummary> storageMap;
        if (this.closed) {
            throw new IOException(this + " is closed");
        }
        boolean claimed = false;
        RecordWriterLease lease = null;
        while (!claimed && !(claimed = (lease = this.getLease()).tryClaim())) {
            if (!lease.shouldRoll()) continue;
            this.tryRollover(lease);
        }
        RecordWriter writer = lease.getWriter();
        try {
            storageMap = this.addEvents(events, writer);
        }
        finally {
            lease.relinquishClaim();
        }
        Integer eventsRolledOver = null;
        boolean shouldRoll = lease.shouldRoll();
        try {
            if (shouldRoll && this.tryRollover(lease)) {
                eventsRolledOver = writer.getRecordsWritten();
            }
        }
        catch (IOException ioe) {
            logger.error("Updated {} but failed to rollover to a new Event File", (Object)this, (Object)ioe);
        }
        final Integer rolloverCount = eventsRolledOver;
        return new StorageResult(){

            @Override
            public Map<ProvenanceEventRecord, StorageSummary> getStorageLocations() {
                return storageMap;
            }

            @Override
            public boolean triggeredRollover() {
                return rolloverCount != null;
            }

            @Override
            public Integer getEventsRolledOver() {
                return rolloverCount;
            }

            public String toString() {
                return this.getStorageLocations().toString();
            }
        };
    }

    private RecordWriterLease getLease() throws IOException {
        do {
            RecordWriterLease lease;
            if ((lease = this.eventWriterLeaseRef.get()) == null) continue;
            return lease;
        } while (!this.tryRollover(null));
        return this.eventWriterLeaseRef.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized boolean tryRollover(RecordWriterLease lease) throws IOException {
        if (!Objects.equals(lease, this.eventWriterLeaseRef.get())) {
            return false;
        }
        long nextEventId = this.idGenerator.get();
        File updatedEventFile = new File(this.partitionDirectory, nextEventId + ".prov");
        RecordWriter updatedWriter = this.recordWriterFactory.createWriter(updatedEventFile, this.idGenerator, false, true);
        updatedWriter.writeHeader(nextEventId);
        RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, this.config.getMaxEventFileCapacity(), this.config.getMaxEventFileCount());
        boolean updated = this.eventWriterLeaseRef.compareAndSet(lease, updatedLease);
        if (!updated) {
            try {
                updatedWriter.close();
            }
            catch (Exception e) {
                logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", (Object)updatedWriter, (Object)e);
            }
            updatedEventFile.delete();
            return false;
        }
        if (lease != null) {
            lease.close();
        }
        SortedMap<Long, File> e = this.minEventIdToPathMap;
        synchronized (e) {
            this.minEventIdToPathMap.put(nextEventId, updatedEventFile);
        }
        if (this.config.isCompressOnRollover() && lease != null && lease.getWriter() != null) {
            boolean offered = false;
            while (!offered && !this.closed) {
                try {
                    offered = this.filesToCompress.offer(lease.getWriter().getFile(), 1L, TimeUnit.SECONDS);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression");
                }
            }
        }
        return true;
    }

    private Map<ProvenanceEventRecord, StorageSummary> addEvents(Iterable<ProvenanceEventRecord> events, RecordWriter writer) throws IOException {
        HashMap<ProvenanceEventRecord, StorageSummary> locationMap = new HashMap<ProvenanceEventRecord, StorageSummary>();
        try {
            long maxId = -1L;
            int numEvents = 0;
            for (ProvenanceEventRecord nextEvent : events) {
                StorageSummary writerSummary = writer.writeRecord(nextEvent);
                StorageSummary summaryWithIndex = new StorageSummary(writerSummary.getEventId(), writerSummary.getStorageLocation(), this.partitionName, writerSummary.getBlockIndex(), writerSummary.getSerializedLength(), writerSummary.getBytesWritten());
                locationMap.put(nextEvent, summaryWithIndex);
                maxId = summaryWithIndex.getEventId();
                ++numEvents;
            }
            if (numEvents == 0) {
                return locationMap;
            }
            writer.flush();
            long maxIdWritten = maxId;
            this.maxEventId.getAndUpdate(cur -> maxIdWritten > cur ? maxIdWritten : cur);
            if (this.config.isAlwaysSync()) {
                writer.sync();
            }
        }
        catch (Exception e) {
            writer.markDirty();
            throw e;
        }
        return locationMap;
    }

    @Override
    public long getSize() {
        return this.getEventFilesFromDisk().collect(Collectors.summarizingLong(file -> file.length())).getSum();
    }

    private Stream<File> getEventFilesFromDisk() {
        File[] files = this.partitionDirectory.listFiles(DirectoryUtils.EVENT_FILE_FILTER);
        return files == null ? Stream.empty() : Arrays.stream(files);
    }

    @Override
    public long getMaxEventId() {
        return this.maxEventId.get();
    }

    @Override
    public Optional<ProvenanceEventRecord> getEvent(long id) throws IOException {
        Optional<File> option = this.getPathForEventId(id);
        if (!option.isPresent()) {
            return Optional.empty();
        }
        try (RecordReader reader = this.recordReaderFactory.newRecordReader(option.get(), Collections.emptyList(), this.config.getMaxAttributeChars());){
            Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(id);
            if (!eventOption.isPresent()) {
                Optional<ProvenanceEventRecord> optional = eventOption;
                return optional;
            }
            ProvenanceEventRecord event = eventOption.get();
            if (event.getEventId() == id) {
                Optional<ProvenanceEventRecord> optional = eventOption;
                return optional;
            }
            Optional<ProvenanceEventRecord> optional = Optional.empty();
            return optional;
        }
    }

    @Override
    public List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxEvents, EventAuthorizer authorizer) throws IOException {
        ArrayList<ProvenanceEventRecord> events = new ArrayList<ProvenanceEventRecord>(Math.min(maxEvents, 1000));
        try (EventIterator iterator = this.createEventIterator(firstRecordId);){
            Optional<ProvenanceEventRecord> eventOption;
            while ((eventOption = iterator.nextEvent()).isPresent() && events.size() < maxEvents) {
                ProvenanceEventRecord event = eventOption.get();
                if (!authorizer.isAuthorized(event)) continue;
                events.add(event);
            }
        }
        return events;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public EventIterator createEventIterator(long minDesiredId) {
        ArrayList<File> filesOfInterest = new ArrayList<File>();
        SortedMap<Long, File> sortedMap = this.minEventIdToPathMap;
        synchronized (sortedMap) {
            File lastFile = null;
            for (Map.Entry<Long, File> entry : this.minEventIdToPathMap.entrySet()) {
                long minFileId = entry.getKey();
                if (minFileId > minDesiredId) {
                    if (filesOfInterest.isEmpty() && lastFile != null) {
                        filesOfInterest.add(lastFile);
                    }
                    filesOfInterest.add(entry.getValue());
                }
                lastFile = entry.getValue();
            }
            if (lastFile != null && !filesOfInterest.contains(lastFile)) {
                filesOfInterest.add(lastFile);
            }
        }
        if (filesOfInterest.isEmpty()) {
            return EventIterator.EMPTY;
        }
        return new SequentialRecordReaderEventIterator(filesOfInterest, this.recordReaderFactory, minDesiredId, this.config.getMaxAttributeChars());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public EventIterator createEventIterator(List<Long> eventIds) {
        ArrayList<File> allFiles;
        SortedMap<Long, File> sortedMap = this.minEventIdToPathMap;
        synchronized (sortedMap) {
            allFiles = new ArrayList<File>(this.minEventIdToPathMap.values());
        }
        if (allFiles.isEmpty()) {
            return EventIterator.EMPTY;
        }
        return new SelectiveRecordReaderEventIterator(allFiles, this.recordReaderFactory, eventIds, this.config.getMaxAttributeChars());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<File> getPathForEventId(long id) {
        File lastFile = null;
        SortedMap<Long, File> sortedMap = this.minEventIdToPathMap;
        synchronized (sortedMap) {
            Map.Entry<Long, File> entry;
            long minId;
            Iterator<Map.Entry<Long, File>> iterator = this.minEventIdToPathMap.entrySet().iterator();
            while (iterator.hasNext() && (minId = (entry = iterator.next()).getKey().longValue()) <= id) {
                lastFile = entry.getValue();
            }
        }
        return Optional.ofNullable(lastFile);
    }

    @Override
    public void purgeOldEvents(long olderThan, TimeUnit unit) {
        long timeCutoff = System.currentTimeMillis() - unit.toMillis(olderThan);
        this.getEventFilesFromDisk().filter(file -> file.lastModified() < timeCutoff).sorted(DirectoryUtils.SMALLEST_ID_FIRST).forEach(file -> this.delete((File)file));
    }

    @Override
    public long purgeOldestEvents() {
        List eventFiles = this.getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList());
        if (eventFiles.isEmpty()) {
            return 0L;
        }
        for (File eventFile : eventFiles) {
            long fileSize = eventFile.length();
            if (this.delete(eventFile)) {
                logger.debug("{} Deleted {} event file ({}) due to storage limits", new Object[]{this, eventFile, FormatUtils.formatDataSize((double)fileSize)});
                return fileSize;
            }
            logger.warn("{} Failed to delete oldest event file {}. This file should be cleaned up manually.", (Object)this, (Object)eventFile);
        }
        return 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean delete(File file) {
        long firstEventId = DirectoryUtils.getMinId(file);
        SortedMap<Long, File> sortedMap = this.minEventIdToPathMap;
        synchronized (sortedMap) {
            this.minEventIdToPathMap.remove(firstEventId);
        }
        if (!file.delete()) {
            logger.warn("Failed to remove Provenance Event file {}; this file should be cleaned up manually", (Object)file);
            return false;
        }
        File tocFile = TocUtil.getTocFile(file);
        if (tocFile.exists() && !tocFile.delete()) {
            logger.warn("Failed to remove Provenance Table-of-Contents file {}; this file should be cleaned up manually", (Object)tocFile);
        }
        return true;
    }

    void reindexLatestEvents(final EventIndex eventIndex) {
        List eventFiles = this.getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList());
        if (eventFiles.isEmpty()) {
            return;
        }
        final long minEventIdToReindex = eventIndex.getMinimumEventIdToReindex(this.partitionName);
        long maxEventId = this.getMaxEventId();
        long eventsToReindex = maxEventId - minEventIdToReindex;
        logger.info("The last Provenance Event indexed for partition {} is {}, but the last event written to partition has ID {}. Re-indexing up to the last {} events to ensure that the Event Index is accurate and up-to-date", new Object[]{this.partitionName, minEventIdToReindex, maxEventId, eventsToReindex, this.partitionDirectory});
        int firstEventFileIndex = 0;
        for (int i = eventFiles.size() - 1; i >= 0; --i) {
            File eventFile = (File)eventFiles.get(i);
            long minIdInFile = DirectoryUtils.getMinId(eventFile);
            if (minIdInFile > minEventIdToReindex) continue;
            firstEventFileIndex = i;
            break;
        }
        List eventFilesToReindex = eventFiles.subList(firstEventFileIndex, eventFiles.size());
        ExecutorService executor = Executors.newFixedThreadPool(Math.min(4, eventFilesToReindex.size()), new NamedThreadFactory("Re-Index Provenance Events", true));
        ArrayList futures = new ArrayList(eventFilesToReindex.size());
        final AtomicLong reindexedCount = new AtomicLong(0L);
        long start = System.nanoTime();
        int fileCount = 0;
        for (final File file : eventFilesToReindex) {
            final boolean skipToEvent = fileCount++ == 0;
            Runnable reindexTask = new Runnable(){

                @Override
                public void run() {
                    HashMap<ProvenanceEventRecord, StorageSummary> storageMap = new HashMap<ProvenanceEventRecord, StorageSummary>(1000);
                    try (RecordReader recordReader = WriteAheadStorePartition.this.recordReaderFactory.newRecordReader(file, Collections.emptyList(), Integer.MAX_VALUE);){
                        Optional<ProvenanceEventRecord> eventOption;
                        if (skipToEvent && !(eventOption = recordReader.skipToEvent(minEventIdToReindex)).isPresent()) {
                            return;
                        }
                        StandardProvenanceEventRecord event = null;
                        while (true) {
                            long startBytesConsumed = recordReader.getBytesConsumed();
                            event = recordReader.nextRecord();
                            if (event == null) {
                                eventIndex.reindexEvents(storageMap);
                                reindexedCount.addAndGet(storageMap.size());
                                storageMap.clear();
                                break;
                            }
                            long eventSize = recordReader.getBytesConsumed() - startBytesConsumed;
                            storageMap.put((ProvenanceEventRecord)event, new StorageSummary(event.getEventId(), file.getName(), WriteAheadStorePartition.this.partitionName, recordReader.getBlockIndex(), eventSize, 0L));
                            if (storageMap.size() != 1000) continue;
                            eventIndex.reindexEvents(storageMap);
                            reindexedCount.addAndGet(storageMap.size());
                            storageMap.clear();
                        }
                    }
                    catch (EOFException eof) {
                        logger.warn("Failed to find event with ID {} in Event File {} due to {}", new Object[]{minEventIdToReindex, file, eof.toString()});
                    }
                    catch (Exception e) {
                        logger.error("Failed to index Provenance Events found in {}", (Object)file, (Object)e);
                    }
                }
            };
            futures.add(executor.submit(reindexTask));
        }
        for (Future future : futures) {
            try {
                future.get();
            }
            catch (ExecutionException ee) {
                logger.error("Failed to re-index some Provenance events. These events may not be query-able via the Provenance interface", ee.getCause());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.error("Interrupted while waiting for Provenance events to be re-indexed", (Throwable)e);
                break;
            }
        }
        try {
            eventIndex.commitChanges(this.partitionName);
        }
        catch (IOException e) {
            logger.error("Failed to re-index Provenance Events for partition " + this.partitionName, (Throwable)e);
        }
        executor.shutdown();
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
        long seconds = millis / 1000L;
        long millisRemainder = millis % 1000L;
        logger.info("Finished re-indexing {} events across {} files for {} in {}.{} seconds", new Object[]{reindexedCount.get(), eventFilesToReindex.size(), this.partitionDirectory, seconds, millisRemainder});
    }

    public String toString() {
        return "Provenance Event Store Partition[directory=" + this.partitionDirectory + "]";
    }
}

