package org.apache.nifi.processors.standard;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.EntityListing;
import org.apache.nifi.processors.standard.util.ListableEntity;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

@TriggerSerially
/* loaded from: input_file:org/apache/nifi/processors/standard/AbstractListProcessor.class */
public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.").required(false).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
    private volatile Long lastListingTime = null;
    private volatile Set<String> latestIdentifiersListed = new HashSet();
    private volatile boolean electedPrimaryNode = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/standard/AbstractListProcessor$StringSerDe.class */
    public static class StringSerDe implements Serializer<String>, Deserializer<String> {
        private StringSerDe() {
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public String m2deserialize(byte[] bArr) throws DeserializationException, IOException {
            if (bArr == null) {
                return null;
            }
            return new String(bArr, StandardCharsets.UTF_8);
        }

        public void serialize(String str, OutputStream outputStream) throws SerializationException, IOException {
            outputStream.write(str.getBytes(StandardCharsets.UTF_8));
        }
    }

    protected File getPersistenceFile() {
        return new File("conf/state/" + getIdentifier());
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(DISTRIBUTED_CACHE_SERVICE);
        return arrayList;
    }

    public void onPropertyModified(PropertyDescriptor propertyDescriptor, String str, String str2) {
        if (isListingResetNecessary(propertyDescriptor)) {
            this.lastListingTime = null;
            this.latestIdentifiersListed = new HashSet();
        }
    }

    public Set<Relationship> getRelationships() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        return hashSet;
    }

    protected String getKey(String str) {
        return getIdentifier() + ".lastListingTime." + str;
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeChange(PrimaryNodeState primaryNodeState) {
        if (primaryNodeState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
            this.electedPrimaryNode = true;
        }
    }

    private EntityListing deserialize(String str) throws JsonParseException, JsonMappingException, IOException {
        ObjectMapper objectMapper = new ObjectMapper();
        return (EntityListing) objectMapper.readValue(objectMapper.readTree(str), EntityListing.class);
    }

    private Long getMinTimestamp(String str, DistributedMapCacheClient distributedMapCacheClient) throws IOException {
        Long l = this.lastListingTime;
        if (l == null || this.electedPrimaryNode) {
            if (distributedMapCacheClient != null) {
                try {
                    StringSerDe stringSerDe = new StringSerDe();
                    String str2 = (String) distributedMapCacheClient.get(getKey(str), stringSerDe, stringSerDe);
                    if (str2 == null || str2.isEmpty()) {
                        l = null;
                        this.latestIdentifiersListed = Collections.emptySet();
                    } else {
                        EntityListing deserialize = deserialize(str2);
                        this.lastListingTime = Long.valueOf(deserialize.getLatestTimestamp().getTime());
                        l = Long.valueOf(deserialize.getLatestTimestamp().getTime());
                        this.latestIdentifiersListed = new HashSet(deserialize.getMatchingIdentifiers());
                    }
                    this.lastListingTime = l;
                    this.electedPrimaryNode = false;
                } catch (IOException e) {
                    throw e;
                }
            }
            try {
                File persistenceFile = getPersistenceFile();
                if (persistenceFile.exists()) {
                    FileInputStream fileInputStream = new FileInputStream(persistenceFile);
                    Throwable th = null;
                    try {
                        try {
                            Properties properties = new Properties();
                            properties.load(fileInputStream);
                            String property = properties.getProperty(str);
                            if (property != null) {
                                long time = deserialize(property).getLatestTimestamp().getTime();
                                if (distributedMapCacheClient != null && (l == null || time > l.longValue())) {
                                    l = Long.valueOf(time);
                                    try {
                                        StringSerDe stringSerDe2 = new StringSerDe();
                                        distributedMapCacheClient.put(getKey(str), property, stringSerDe2, stringSerDe2);
                                    } catch (IOException e2) {
                                        getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed state due to {}. If a new node performs Listing, data duplication may occur", new Object[]{str, property, e2});
                                    }
                                }
                            }
                            if (fileInputStream != null) {
                                if (0 != 0) {
                                    try {
                                        fileInputStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    fileInputStream.close();
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                }
            } catch (IOException e3) {
                getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", e3);
            }
        }
        return l;
    }

    private String serializeState(List<T> list) throws JsonGenerationException, JsonMappingException, IOException {
        if (list.isEmpty()) {
            return null;
        }
        ArrayList arrayList = new ArrayList(list);
        Collections.sort(arrayList, new Comparator<ListableEntity>() { // from class: org.apache.nifi.processors.standard.AbstractListProcessor.1
            @Override // java.util.Comparator
            public int compare(ListableEntity listableEntity, ListableEntity listableEntity2) {
                return Long.compare(listableEntity.getTimestamp(), listableEntity2.getTimestamp());
            }
        });
        long timestamp = ((ListableEntity) arrayList.get(arrayList.size() - 1)).getTimestamp();
        HashSet hashSet = new HashSet();
        for (int size = arrayList.size() - 1; size >= 0; size--) {
            ListableEntity listableEntity = (ListableEntity) arrayList.get(size);
            if (listableEntity.getTimestamp() == timestamp) {
                hashSet.add(listableEntity.getIdentifier());
            }
        }
        this.latestIdentifiersListed = hashSet;
        EntityListing entityListing = new EntityListing();
        entityListing.setLatestTimestamp(new Date(timestamp));
        HashSet hashSet2 = new HashSet();
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            hashSet2.add((String) it.next());
        }
        entityListing.setMatchingIdentifiers(hashSet2);
        return new ObjectMapper().writerWithType(EntityListing.class).writeValueAsString(entityListing);
    }

    protected void persistLocalState(String str, String str2) throws IOException {
        File persistenceFile = getPersistenceFile();
        File parentFile = persistenceFile.getParentFile();
        if (!parentFile.exists() && !parentFile.mkdirs()) {
            throw new IOException("Could not create directory " + parentFile.getAbsolutePath() + " in order to save local state");
        }
        Properties properties = new Properties();
        if (persistenceFile.exists()) {
            FileInputStream fileInputStream = new FileInputStream(persistenceFile);
            Throwable th = null;
            try {
                properties.load(fileInputStream);
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
            } catch (Throwable th3) {
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                throw th3;
            }
        }
        properties.setProperty(str, str2);
        FileOutputStream fileOutputStream = new FileOutputStream(persistenceFile);
        Throwable th5 = null;
        try {
            try {
                properties.store(fileOutputStream, (String) null);
                if (fileOutputStream != null) {
                    if (0 == 0) {
                        fileOutputStream.close();
                        return;
                    }
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th6) {
                        th5.addSuppressed(th6);
                    }
                }
            } catch (Throwable th7) {
                th5 = th7;
                throw th7;
            }
        } catch (Throwable th8) {
            if (fileOutputStream != null) {
                if (th5 != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th9) {
                        th5.addSuppressed(th9);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th8;
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        String path = getPath(processContext);
        DistributedMapCacheClient asControllerService = processContext.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        try {
            Long minTimestamp = getMinTimestamp(path, asControllerService);
            try {
                List<T> performListing = performListing(processContext, minTimestamp);
                if (performListing == null) {
                    processContext.yield();
                    return;
                }
                int i = 0;
                Long l = null;
                for (T t : performListing) {
                    if (minTimestamp == null || t.getTimestamp() > minTimestamp.longValue() || (t.getTimestamp() == minTimestamp.longValue() && !this.latestIdentifiersListed.contains(t.getIdentifier()))) {
                        processSession.transfer(processSession.putAllAttributes(processSession.create(), createAttributes(t, processContext)), REL_SUCCESS);
                        i++;
                        if (l == null || t.getTimestamp() > l.longValue()) {
                            l = Long.valueOf(t.getTimestamp());
                        }
                    }
                }
                if (i <= 0) {
                    getLogger().debug("There is no data to list. Yielding.");
                    processContext.yield();
                    if (this.lastListingTime == null) {
                        this.lastListingTime = 0L;
                        return;
                    }
                    return;
                }
                getLogger().info("Successfully created listing with {} new objects", new Object[]{Integer.valueOf(i)});
                processSession.commit();
                String str = null;
                try {
                    str = serializeState(performListing);
                } catch (Exception e) {
                    getLogger().error("Failed to serialize state due to {}", new Object[]{e});
                }
                if (str != null) {
                    try {
                        persistLocalState(path, str);
                    } catch (IOException e2) {
                        getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", e2);
                    }
                    if (asControllerService != null) {
                        try {
                            asControllerService.put(getKey(path), str, new StringSerDe(), new StringSerDe());
                        } catch (IOException e3) {
                            getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", e3);
                        }
                    }
                }
                this.lastListingTime = l;
            } catch (IOException e4) {
                getLogger().error("Failed to perform listing on remote host due to {}", e4);
                processContext.yield();
            }
        } catch (IOException e5) {
            getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
            processContext.yield();
        }
    }

    protected abstract Map<String, String> createAttributes(T t, ProcessContext processContext);

    protected abstract String getPath(ProcessContext processContext);

    protected abstract List<T> performListing(ProcessContext processContext, Long l) throws IOException;

    protected abstract boolean isListingResetNecessary(PropertyDescriptor propertyDescriptor);
}
