/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.storage;

import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Acl;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.storage.AbstractGCSProcessor;
import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
import org.apache.nifi.processors.gcp.storage.FetchGCSObject;
import org.apache.nifi.processors.gcp.storage.PutGCSObject;

@PrimaryNodeOnly
@TriggerSerially
@TriggerWhenEmpty
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"google cloud", "google", "storage", "gcs", "list"})
@CapabilityDescription(value="Retrieves a listing of objects from an GCS bucket. For each object that is listed, creates a FlowFile that represents the object so that it can be fetched in conjunction with FetchGCSObject. This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating all of the data.")
@Stateful(scopes={Scope.CLUSTER}, description="After performing a listing of keys, the timestamp of the newest key is stored, along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
@SeeAlso(value={PutGCSObject.class, DeleteGCSObject.class, FetchGCSObject.class})
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="The name of the file"), @WritesAttribute(attribute="gcs.bucket", description="Bucket of the object."), @WritesAttribute(attribute="gcs.key", description="Name of the object."), @WritesAttribute(attribute="gcs.size", description="Size of the object."), @WritesAttribute(attribute="gcs.cache.control", description="Data cache control of the object."), @WritesAttribute(attribute="gcs.component.count", description="The number of components which make up the object."), @WritesAttribute(attribute="gcs.content.disposition", description="The data content disposition of the object."), @WritesAttribute(attribute="gcs.content.encoding", description="The content encoding of the object."), @WritesAttribute(attribute="gcs.content.language", description="The content language of the object."), @WritesAttribute(attribute="mime.type", description="The MIME/Content-Type of the object"), @WritesAttribute(attribute="gcs.crc32c", description="The CRC32C checksum of object's data, encoded in base64 in big-endian order."), @WritesAttribute(attribute="gcs.create.time", description="The creation time of the object (milliseconds)"), @WritesAttribute(attribute="gcs.update.time", description="The last modification time of the object (milliseconds)"), @WritesAttribute(attribute="gcs.encryption.algorithm", description="The algorithm used to encrypt the object."), @WritesAttribute(attribute="gcs.encryption.sha256", description="The SHA256 hash of the key used to encrypt the object"), @WritesAttribute(attribute="gcs.etag", description="The HTTP 1.1 Entity tag for the object."), @WritesAttribute(attribute="gcs.generated.id", description="The service-generated for the object"), @WritesAttribute(attribute="gcs.generation", description="The data generation of the object."), @WritesAttribute(attribute="gcs.md5", description="The MD5 hash of the object's data encoded in base64."), @WritesAttribute(attribute="gcs.media.link", description="The media download link to the object."), @WritesAttribute(attribute="gcs.metageneration", description="The metageneration of the object."), @WritesAttribute(attribute="gcs.owner", description="The owner (uploader) of the object."), @WritesAttribute(attribute="gcs.owner.type", description="The ACL entity type of the uploader of the object."), @WritesAttribute(attribute="gcs.uri", description="The URI of the object as a string.")})
public class ListGCSBucket
extends AbstractGCSProcessor {
    public static final PropertyDescriptor BUCKET = new PropertyDescriptor.Builder().name("gcs-bucket").displayName("Bucket").description("Bucket of the object.").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("gcs-prefix").displayName("Prefix").description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor USE_GENERATIONS = new PropertyDescriptor.Builder().name("gcs-use-generations").displayName("Use Generations").expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("false").description("Specifies whether to use GCS Generations, if applicable.  If false, only the latest version of each object will be returned.").build();
    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<Relationship>(Collections.singletonList(REL_SUCCESS)));
    public static final String CURRENT_TIMESTAMP = "currentTimestamp";
    public static final String CURRENT_KEY_PREFIX = "key-";
    protected long currentTimestamp = 0L;
    protected Set<String> currentKeys;

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return ImmutableList.builder().addAll(super.getSupportedPropertyDescriptors()).add((Object)BUCKET).add((Object)PREFIX).add((Object)USE_GENERATIONS).build();
    }

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    private Set<String> extractKeys(StateMap stateMap) {
        return stateMap.toMap().entrySet().parallelStream().filter(x -> ((String)x.getKey()).startsWith(CURRENT_KEY_PREFIX)).map(Map.Entry::getValue).collect(Collectors.toSet());
    }

    void restoreState(ProcessContext context) throws IOException {
        StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
        if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get("key-0") == null) {
            this.currentTimestamp = 0L;
            this.currentKeys = new HashSet<String>();
        } else {
            this.currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
            this.currentKeys = this.extractKeys(stateMap);
        }
    }

    void persistState(ProcessContext context) {
        HashMap<String, String> state = new HashMap<String, String>();
        state.put(CURRENT_TIMESTAMP, String.valueOf(this.currentTimestamp));
        int i = 0;
        for (String key : this.currentKeys) {
            state.put(CURRENT_KEY_PREFIX + i, key);
            ++i;
        }
        try {
            context.getStateManager().setState(state, Scope.CLUSTER);
        }
        catch (IOException ioe) {
            this.getLogger().error("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", (Throwable)ioe);
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        try {
            this.restoreState(context);
        }
        catch (IOException e) {
            this.getLogger().error("Failed to restore processor state; yielding", (Throwable)e);
            context.yield();
            return;
        }
        long startNanos = System.nanoTime();
        String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
        String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
        boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
        ArrayList<Storage.BlobListOption> listOptions = new ArrayList<Storage.BlobListOption>();
        if (prefix != null) {
            listOptions.add(Storage.BlobListOption.prefix((String)prefix));
        }
        if (useGenerations) {
            listOptions.add(Storage.BlobListOption.versions((boolean)true));
        }
        Storage storage = (Storage)this.getCloudService();
        int listCount = 0;
        long maxTimestamp = 0L;
        Page blobPages = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[listOptions.size()]));
        do {
            for (Blob blob : blobPages.getValues()) {
                long lastModified = blob.getUpdateTime();
                if (lastModified < this.currentTimestamp || lastModified == this.currentTimestamp && this.currentKeys.contains(blob.getName())) continue;
                HashMap<String, String> attributes = new HashMap<String, String>();
                attributes.put("gcs.bucket", blob.getBucket());
                attributes.put("gcs.key", blob.getName());
                if (blob.getSize() != null) {
                    attributes.put("gcs.size", String.valueOf(blob.getSize()));
                }
                if (blob.getCacheControl() != null) {
                    attributes.put("gcs.cache.control", blob.getCacheControl());
                }
                if (blob.getComponentCount() != null) {
                    attributes.put("gcs.component.count", String.valueOf(blob.getComponentCount()));
                }
                if (blob.getContentDisposition() != null) {
                    attributes.put("gcs.content.disposition", blob.getContentDisposition());
                }
                if (blob.getContentEncoding() != null) {
                    attributes.put("gcs.content.encoding", blob.getContentEncoding());
                }
                if (blob.getContentLanguage() != null) {
                    attributes.put("gcs.content.language", blob.getContentLanguage());
                }
                if (blob.getContentType() != null) {
                    attributes.put(CoreAttributes.MIME_TYPE.key(), blob.getContentType());
                }
                if (blob.getCrc32c() != null) {
                    attributes.put("gcs.crc32c", blob.getCrc32c());
                }
                if (blob.getCustomerEncryption() != null) {
                    BlobInfo.CustomerEncryption encryption = blob.getCustomerEncryption();
                    attributes.put("gcs.encryption.algorithm", encryption.getEncryptionAlgorithm());
                    attributes.put("gcs.encryption.sha256", encryption.getKeySha256());
                }
                if (blob.getEtag() != null) {
                    attributes.put("gcs.etag", blob.getEtag());
                }
                if (blob.getGeneratedId() != null) {
                    attributes.put("gcs.generated.id", blob.getGeneratedId());
                }
                if (blob.getGeneration() != null) {
                    attributes.put("gcs.generation", String.valueOf(blob.getGeneration()));
                }
                if (blob.getMd5() != null) {
                    attributes.put("gcs.md5", blob.getMd5());
                }
                if (blob.getMediaLink() != null) {
                    attributes.put("gcs.media.link", blob.getMediaLink());
                }
                if (blob.getMetageneration() != null) {
                    attributes.put("gcs.metageneration", String.valueOf(blob.getMetageneration()));
                }
                if (blob.getOwner() != null) {
                    Acl.Entity entity = blob.getOwner();
                    if (entity instanceof Acl.User) {
                        attributes.put("gcs.owner", ((Acl.User)entity).getEmail());
                        attributes.put("gcs.owner.type", "user");
                    } else if (entity instanceof Acl.Group) {
                        attributes.put("gcs.owner", ((Acl.Group)entity).getEmail());
                        attributes.put("gcs.owner.type", "group");
                    } else if (entity instanceof Acl.Domain) {
                        attributes.put("gcs.owner", ((Acl.Domain)entity).getDomain());
                        attributes.put("gcs.owner.type", "domain");
                    } else if (entity instanceof Acl.Project) {
                        attributes.put("gcs.owner", ((Acl.Project)entity).getProjectId());
                        attributes.put("gcs.owner.type", "project");
                    }
                }
                if (blob.getSelfLink() != null) {
                    attributes.put("gcs.uri", blob.getSelfLink());
                }
                attributes.put(CoreAttributes.FILENAME.key(), blob.getName());
                if (blob.getCreateTime() != null) {
                    attributes.put("gcs.create.time", String.valueOf(blob.getCreateTime()));
                }
                if (blob.getUpdateTime() != null) {
                    attributes.put("gcs.update.time", String.valueOf(blob.getUpdateTime()));
                }
                FlowFile flowFile = session.create();
                flowFile = session.putAllAttributes(flowFile, attributes);
                session.transfer(flowFile, REL_SUCCESS);
                if (lastModified > maxTimestamp) {
                    maxTimestamp = lastModified;
                    this.currentKeys.clear();
                }
                if (lastModified == maxTimestamp) {
                    this.currentKeys.add(blob.getName());
                }
                ++listCount;
            }
            blobPages = blobPages.getNextPage();
            this.commit(context, session, listCount);
            listCount = 0;
        } while (blobPages != null);
        this.currentTimestamp = maxTimestamp;
        long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
        this.getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
        if (!this.commit(context, session, listCount)) {
            if (this.currentTimestamp > 0L) {
                this.persistState(context);
            }
            this.getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", new Object[]{bucket});
            context.yield();
        }
    }

    private boolean commit(ProcessContext context, ProcessSession session, int listCount) {
        boolean willCommit;
        boolean bl = willCommit = listCount > 0;
        if (willCommit) {
            this.getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[]{listCount});
            session.commit();
            this.persistState(context);
        }
        return willCommit;
    }
}

