/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.EntityListing;
import org.apache.nifi.processors.standard.util.ListableEntity;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

@TriggerSerially
@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. The scope used depends on the implementation.")
public abstract class AbstractListProcessor<T extends ListableEntity>
extends AbstractProcessor {
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.").required(false).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
    private volatile Long lastListingTime = null;
    private volatile Long lastProcessedTime = 0L;
    private volatile Long lastRunTime = 0L;
    private volatile boolean justElectedPrimaryNode = false;
    private volatile boolean resetState = false;
    static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
    static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
    static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp";

    protected File getPersistenceFile() {
        return new File("conf/state/" + this.getIdentifier());
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(DISTRIBUTED_CACHE_SERVICE);
        return properties;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (this.isConfigurationRestored() && this.isListingResetNecessary(descriptor)) {
            this.resetTimeStates();
            this.resetState = true;
        }
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        return relationships;
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeChange(PrimaryNodeState newState) {
        this.justElectedPrimaryNode = newState == PrimaryNodeState.ELECTED_PRIMARY_NODE;
    }

    @OnScheduled
    public final void updateState(ProcessContext context) throws IOException {
        String path = this.getPath(context);
        DistributedMapCacheClient client = (DistributedMapCacheClient)context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        StateMap stateMap = context.getStateManager().getState(this.getStateScope(context));
        if (stateMap.getVersion() == -1L) {
            try {
                this.migrateState(path, client, context.getStateManager(), this.getStateScope(context));
            }
            catch (IOException ioe) {
                throw new IOException("Failed to properly migrate state to State Manager", ioe);
            }
        }
        if (this.lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == null) {
            this.getLogger().info("Detected that state was cleared for this component.  Resetting internal values.");
            this.resetTimeStates();
        }
        if (this.resetState) {
            context.getStateManager().clear(this.getStateScope(context));
            this.resetState = false;
        }
    }

    private void migrateState(String path, DistributedMapCacheClient client, StateManager stateManager, Scope scope) throws IOException {
        File persistenceFile;
        Long minTimestamp = null;
        if (client != null) {
            StringSerDe serde = new StringSerDe();
            String serializedState = (String)client.get((Object)this.getKey(path), (Serializer)serde, (Deserializer)serde);
            if (serializedState != null && !serializedState.isEmpty()) {
                EntityListing listing = this.deserialize(serializedState);
                minTimestamp = listing.getLatestTimestamp().getTime();
            }
            if (client != null) {
                try {
                    client.remove((Object)path, (Serializer)new StringSerDe());
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new State Management service, so the Distributed Cache Service is no longer needed.");
                }
            }
        }
        if ((persistenceFile = this.getPersistenceFile()).exists()) {
            Properties props = new Properties();
            try (FileInputStream fis = new FileInputStream(persistenceFile);){
                props.load(fis);
            }
            String locallyPersistedValue = props.getProperty(path);
            if (locallyPersistedValue != null) {
                EntityListing listing = this.deserialize(locallyPersistedValue);
                long localTimestamp = listing.getLatestTimestamp().getTime();
                if (minTimestamp == null || localTimestamp > minTimestamp) {
                    minTimestamp = localTimestamp;
                }
            }
            if (persistenceFile.exists() && !persistenceFile.delete()) {
                this.getLogger().warn("Migrated state but failed to delete local persistence file");
            }
        }
        if (minTimestamp != null) {
            this.persist(minTimestamp, minTimestamp, stateManager, scope);
        }
    }

    private void persist(long listingTimestamp, long processedTimestamp, StateManager stateManager, Scope scope) throws IOException {
        HashMap<String, String> updatedState = new HashMap<String, String>(1);
        updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(listingTimestamp));
        updatedState.put(PROCESSED_TIMESTAMP_KEY, String.valueOf(processedTimestamp));
        stateManager.setState(updatedState, scope);
    }

    protected String getKey(String directory) {
        return this.getIdentifier() + ".lastListingTime." + directory;
    }

    private EntityListing deserialize(String serializedState) throws JsonParseException, JsonMappingException, IOException {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode jsonNode = mapper.readTree(serializedState);
        return (EntityListing)mapper.readValue(jsonNode, EntityListing.class);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        List<T> entityList;
        Long minTimestamp = this.lastListingTime;
        if (this.lastListingTime == null || this.lastProcessedTime == null || this.justElectedPrimaryNode) {
            try {
                StateMap stateMap = context.getStateManager().getState(this.getStateScope(context));
                String listingTimestampString = stateMap.get(LISTING_TIMESTAMP_KEY);
                String lastProcessedString = stateMap.get(PROCESSED_TIMESTAMP_KEY);
                if (lastProcessedString != null) {
                    this.lastProcessedTime = Long.parseLong(lastProcessedString);
                }
                if (listingTimestampString != null) {
                    minTimestamp = Long.parseLong(listingTimestampString);
                    if (minTimestamp == this.lastListingTime) {
                        context.yield();
                        return;
                    }
                    this.lastListingTime = minTimestamp;
                }
                this.justElectedPrimaryNode = false;
            }
            catch (IOException ioe) {
                this.getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
                context.yield();
                return;
            }
        }
        try {
            entityList = this.performListing(context, minTimestamp);
        }
        catch (IOException e) {
            this.getLogger().error("Failed to perform listing on remote host due to {}", (Throwable)e);
            context.yield();
            return;
        }
        if (entityList == null || entityList.isEmpty()) {
            context.yield();
            return;
        }
        Long latestListingTimestamp = null;
        TreeMap<Long, ArrayList<ListableEntity>> orderedEntries = new TreeMap<Long, ArrayList<ListableEntity>>();
        for (ListableEntity entity : entityList) {
            long entityTimestamp = entity.getTimestamp();
            boolean newEntry = minTimestamp == null || entityTimestamp >= minTimestamp && entityTimestamp > this.lastProcessedTime;
            if (!newEntry) continue;
            ArrayList<ListableEntity> entitiesForTimestamp = (ArrayList<ListableEntity>)orderedEntries.get(entity.getTimestamp());
            if (entitiesForTimestamp == null) {
                entitiesForTimestamp = new ArrayList<ListableEntity>();
                orderedEntries.put(entity.getTimestamp(), entitiesForTimestamp);
            }
            entitiesForTimestamp.add(entity);
        }
        int flowfilesCreated = 0;
        if (orderedEntries.size() > 0) {
            latestListingTimestamp = (Long)orderedEntries.lastKey();
            if (latestListingTimestamp.equals(this.lastListingTime)) {
                if (System.nanoTime() - this.lastRunTime < LISTING_LAG_NANOS || latestListingTimestamp.equals(this.lastProcessedTime)) {
                    context.yield();
                    return;
                }
            } else {
                orderedEntries.remove(latestListingTimestamp);
            }
            for (List timestampEntities : orderedEntries.values()) {
                for (ListableEntity entity : timestampEntities) {
                    Map<String, String> attributes = this.createAttributes(entity, context);
                    FlowFile flowFile = session.create();
                    flowFile = session.putAllAttributes(flowFile, attributes);
                    session.transfer(flowFile, REL_SUCCESS);
                    ++flowfilesCreated;
                }
            }
        }
        if (latestListingTimestamp != null) {
            boolean processedNewFiles;
            boolean bl = processedNewFiles = flowfilesCreated > 0;
            if (processedNewFiles) {
                this.lastProcessedTime = (Long)orderedEntries.lastKey();
                this.getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
                session.commit();
            }
            this.lastRunTime = System.nanoTime();
            if (!latestListingTimestamp.equals(this.lastListingTime) || processedNewFiles) {
                try {
                    this.lastListingTime = latestListingTimestamp;
                    this.persist(latestListingTimestamp, this.lastProcessedTime, context.getStateManager(), this.getStateScope(context));
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or if another node begins executing this Processor, data duplication may occur.", (Throwable)ioe);
                }
            }
        } else {
            this.getLogger().debug("There is no data to list. Yielding.");
            context.yield();
            if (this.lastListingTime == null) {
                this.lastListingTime = 0L;
            }
            return;
        }
    }

    private void resetTimeStates() {
        this.lastListingTime = null;
        this.lastProcessedTime = 0L;
        this.lastRunTime = 0L;
    }

    protected abstract Map<String, String> createAttributes(T var1, ProcessContext var2);

    protected abstract String getPath(ProcessContext var1);

    protected abstract List<T> performListing(ProcessContext var1, Long var2) throws IOException;

    protected abstract boolean isListingResetNecessary(PropertyDescriptor var1);

    protected abstract Scope getStateScope(ProcessContext var1);

    private static class StringSerDe
    implements Serializer<String>,
    Deserializer<String> {
        private StringSerDe() {
        }

        public String deserialize(byte[] value) throws DeserializationException, IOException {
            if (value == null) {
                return null;
            }
            return new String(value, StandardCharsets.UTF_8);
        }

        public void serialize(String value, OutputStream out) throws SerializationException, IOException {
            out.write(value.getBytes(StandardCharsets.UTF_8));
        }
    }
}

