/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.EntityListing;
import org.apache.nifi.processors.standard.util.ListableEntity;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

@TriggerSerially
public abstract class AbstractListProcessor<T extends ListableEntity>
extends AbstractProcessor {
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.").required(false).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
    private volatile Long lastListingTime = null;
    private volatile Set<String> latestIdentifiersListed = new HashSet<String>();
    private volatile boolean electedPrimaryNode = false;

    protected File getPersistenceFile() {
        return new File("conf/state/" + this.getIdentifier());
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(DISTRIBUTED_CACHE_SERVICE);
        return properties;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (this.isListingResetNecessary(descriptor)) {
            this.lastListingTime = null;
            this.latestIdentifiersListed = new HashSet<String>();
        }
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        return relationships;
    }

    protected String getKey(String directory) {
        return this.getIdentifier() + ".lastListingTime." + directory;
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeChange(PrimaryNodeState newState) {
        if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
            this.electedPrimaryNode = true;
        }
    }

    private EntityListing deserialize(String serializedState) throws JsonParseException, JsonMappingException, IOException {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode jsonNode = mapper.readTree(serializedState);
        return (EntityListing)mapper.readValue(jsonNode, EntityListing.class);
    }

    private Long getMinTimestamp(String directory, DistributedMapCacheClient client) throws IOException {
        Long minTimestamp;
        block24: {
            minTimestamp = this.lastListingTime;
            if (minTimestamp == null || this.electedPrimaryNode) {
                if (client != null) {
                    StringSerDe serde = new StringSerDe();
                    String serializedState = (String)client.get((Object)this.getKey(directory), (Serializer)serde, (Deserializer)serde);
                    if (serializedState == null || serializedState.isEmpty()) {
                        minTimestamp = null;
                        this.latestIdentifiersListed = Collections.emptySet();
                    } else {
                        EntityListing listing = this.deserialize(serializedState);
                        this.lastListingTime = listing.getLatestTimestamp().getTime();
                        minTimestamp = listing.getLatestTimestamp().getTime();
                        this.latestIdentifiersListed = new HashSet<String>(listing.getMatchingIdentifiers());
                    }
                    this.lastListingTime = minTimestamp;
                    this.electedPrimaryNode = false;
                }
                try {
                    File persistenceFile = this.getPersistenceFile();
                    if (!persistenceFile.exists()) break block24;
                    try (FileInputStream fis = new FileInputStream(persistenceFile);){
                        Properties props = new Properties();
                        props.load(fis);
                        String locallyPersistedValue = props.getProperty(directory);
                        if (locallyPersistedValue != null) {
                            EntityListing listing = this.deserialize(locallyPersistedValue);
                            long localTimestamp = listing.getLatestTimestamp().getTime();
                            if (client != null && (minTimestamp == null || localTimestamp > minTimestamp)) {
                                minTimestamp = localTimestamp;
                                try {
                                    StringSerDe serde = new StringSerDe();
                                    client.put((Object)this.getKey(directory), (Object)locallyPersistedValue, (Serializer)serde, (Serializer)serde);
                                }
                                catch (IOException ioe) {
                                    this.getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed state due to {}. If a new node performs Listing, data duplication may occur", new Object[]{directory, locallyPersistedValue, ioe});
                                }
                            }
                        }
                    }
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", (Throwable)ioe);
                }
            }
        }
        return minTimestamp;
    }

    private String serializeState(List<T> entities) throws JsonGenerationException, JsonMappingException, IOException {
        if (entities.isEmpty()) {
            return null;
        }
        ArrayList<T> sortedEntities = new ArrayList<T>(entities);
        Collections.sort(sortedEntities, new Comparator<ListableEntity>(){

            @Override
            public int compare(ListableEntity o1, ListableEntity o2) {
                return Long.compare(o1.getTimestamp(), o2.getTimestamp());
            }
        });
        long latestListingModTime = ((ListableEntity)sortedEntities.get(sortedEntities.size() - 1)).getTimestamp();
        HashSet<String> idsWithTimestampEqualToListingTime = new HashSet<String>();
        for (int i = sortedEntities.size() - 1; i >= 0; --i) {
            ListableEntity entity = (ListableEntity)sortedEntities.get(i);
            if (entity.getTimestamp() != latestListingModTime) continue;
            idsWithTimestampEqualToListingTime.add(entity.getIdentifier());
        }
        this.latestIdentifiersListed = idsWithTimestampEqualToListingTime;
        EntityListing listing = new EntityListing();
        listing.setLatestTimestamp(new Date(latestListingModTime));
        HashSet<String> ids = new HashSet<String>();
        for (String id : idsWithTimestampEqualToListingTime) {
            ids.add(id);
        }
        listing.setMatchingIdentifiers(ids);
        ObjectMapper mapper = new ObjectMapper();
        String serializedState = mapper.writerWithType(EntityListing.class).writeValueAsString((Object)listing);
        return serializedState;
    }

    protected void persistLocalState(String path, String serializedState) throws IOException {
        Throwable throwable;
        File persistenceFile = this.getPersistenceFile();
        File dir = persistenceFile.getParentFile();
        if (!dir.exists() && !dir.mkdirs()) {
            throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
        }
        Properties props = new Properties();
        if (persistenceFile.exists()) {
            throwable = null;
            try (FileInputStream fis = new FileInputStream(persistenceFile);){
                props.load(fis);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
        }
        props.setProperty(path, serializedState);
        throwable = null;
        try (FileOutputStream fos = new FileOutputStream(persistenceFile);){
            props.store(fos, null);
        }
        catch (Throwable throwable3) {
            throwable = throwable3;
            throw throwable3;
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        List<T> entityList;
        Long minTimestamp;
        String path = this.getPath(context);
        DistributedMapCacheClient client = (DistributedMapCacheClient)context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        try {
            minTimestamp = this.getMinTimestamp(path, client);
        }
        catch (IOException ioe) {
            this.getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
            context.yield();
            return;
        }
        try {
            entityList = this.performListing(context, minTimestamp);
        }
        catch (IOException e) {
            this.getLogger().error("Failed to perform listing on remote host due to {}", (Throwable)e);
            context.yield();
            return;
        }
        if (entityList == null) {
            context.yield();
            return;
        }
        int listCount = 0;
        Long latestListingTimestamp = null;
        for (ListableEntity entity : entityList) {
            boolean list = minTimestamp == null || entity.getTimestamp() > minTimestamp || entity.getTimestamp() == minTimestamp.longValue() && !this.latestIdentifiersListed.contains(entity.getIdentifier());
            if (!list) continue;
            Map<String, String> attributes = this.createAttributes(entity, context);
            FlowFile flowFile = session.create();
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.transfer(flowFile, REL_SUCCESS);
            ++listCount;
            if (latestListingTimestamp != null && entity.getTimestamp() <= latestListingTimestamp) continue;
            latestListingTimestamp = entity.getTimestamp();
        }
        if (listCount > 0) {
            this.getLogger().info("Successfully created listing with {} new objects", new Object[]{listCount});
            session.commit();
            String serializedState = null;
            try {
                serializedState = this.serializeState(entityList);
            }
            catch (Exception e) {
                this.getLogger().error("Failed to serialize state due to {}", new Object[]{e});
            }
            if (serializedState != null) {
                try {
                    this.persistLocalState(path, serializedState);
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", (Throwable)ioe);
                }
                if (client != null) {
                    try {
                        client.put((Object)this.getKey(path), (Object)serializedState, (Serializer)new StringSerDe(), (Serializer)new StringSerDe());
                    }
                    catch (IOException ioe) {
                        this.getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", (Throwable)ioe);
                    }
                }
            }
        } else {
            this.getLogger().debug("There is no data to list. Yielding.");
            context.yield();
            if (this.lastListingTime == null) {
                this.lastListingTime = 0L;
            }
            return;
        }
        this.lastListingTime = latestListingTimestamp;
    }

    protected abstract Map<String, String> createAttributes(T var1, ProcessContext var2);

    protected abstract String getPath(ProcessContext var1);

    protected abstract List<T> performListing(ProcessContext var1, Long var2) throws IOException;

    protected abstract boolean isListingResetNecessary(PropertyDescriptor var1);

    private static class StringSerDe
    implements Serializer<String>,
    Deserializer<String> {
        private StringSerDe() {
        }

        public String deserialize(byte[] value) throws DeserializationException, IOException {
            if (value == null) {
                return null;
            }
            return new String(value, StandardCharsets.UTF_8);
        }

        public void serialize(String value, OutputStream out) throws SerializationException, IOException {
            out.write(value.getBytes(StandardCharsets.UTF_8));
        }
    }
}

