/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.pubsub;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.common.collect.ImmutableList;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub;

@SeeAlso(value={PublishGCPubSub.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"google", "google-cloud", "gcp", "message", "pubsub", "consume"})
@CapabilityDescription(value="Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, the configured number of messages will be pulled in a single request, else only one message will be pulled.")
@WritesAttributes(value={@WritesAttribute(attribute="gcp.pubsub.ackId", description="Acknowledgement Id of the consumed Google Cloud PubSub message"), @WritesAttribute(attribute="gcp.pubsub.messageSize", description="Serialized size of the consumed Google Cloud PubSub message"), @WritesAttribute(attribute="gcp.pubsub.attributesCount", description="Number of attributes the consumed PubSub message has, if any"), @WritesAttribute(attribute="gcp.pubsub.publishTime", description="Timestamp value when the message was published"), @WritesAttribute(attribute="Dynamic Attributes", description="Other than the listed attributes, this processor may write zero or more attributes, if the original Google Cloud Publisher client added any attributes to the message while sending")})
public class ConsumeGCPubSub
extends AbstractGCPubSubProcessor {
    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder().name("gcp-pubsub-subscription").displayName("Subscription").addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).description("Name of the Google Cloud Pub/Sub Subscription").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    private SubscriberStub subscriber = null;
    private PullRequest pullRequest;
    private AtomicReference<Exception> storedException = new AtomicReference();

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
        this.pullRequest = PullRequest.newBuilder().setMaxMessages(batchSize.intValue()).setReturnImmediately(false).setSubscription(this.getSubscriptionName(context)).build();
        try {
            this.subscriber = this.getSubscriber(context);
        }
        catch (IOException e) {
            this.storedException.set(e);
            this.getLogger().error("Failed to create Google Cloud Subscriber due to {}", new Object[]{e});
        }
    }

    @OnStopped
    public void onStopped() {
        if (this.subscriber != null) {
            this.subscriber.shutdown();
        }
    }

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return ImmutableList.of((Object)PROJECT_ID, (Object)GCP_CREDENTIALS_PROVIDER_SERVICE, (Object)SUBSCRIPTION, (Object)BATCH_SIZE);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return Collections.singleton(REL_SUCCESS);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        if (this.subscriber == null) {
            if (this.storedException.get() != null) {
                this.getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", new Object[]{this.storedException.get()});
            } else {
                this.getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor...");
            }
            context.yield();
            return;
        }
        PullResponse pullResponse = (PullResponse)this.subscriber.pullCallable().call((Object)this.pullRequest);
        ArrayList<String> ackIds = new ArrayList<String>();
        for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
            if (!message.hasMessage()) continue;
            FlowFile flowFile = session.create();
            HashMap<String, String> attributes = new HashMap<String, String>();
            ackIds.add(message.getAckId());
            attributes.put("gcp.pubsub.ackId", message.getAckId());
            attributes.put("gcp.pubsub.messageSize", String.valueOf(message.getSerializedSize()));
            attributes.put("gcp.pubsub.messageId", message.getMessage().getMessageId());
            attributes.put("gcp.pubsub.attributesCount", String.valueOf(message.getMessage().getAttributesCount()));
            attributes.put("gcp.pubsub.publishTime", String.valueOf(message.getMessage().getPublishTime().getSeconds()));
            attributes.putAll(message.getMessage().getAttributesMap());
            flowFile = session.putAllAttributes(flowFile, attributes);
            flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toByteArray()));
            session.transfer(flowFile, REL_SUCCESS);
            session.getProvenanceReporter().receive(flowFile, this.getSubscriptionName(context));
        }
        if (!ackIds.isEmpty()) {
            AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().addAllAckIds(ackIds).setSubscription(this.getSubscriptionName(context)).build();
            this.subscriber.acknowledgeCallable().call((Object)acknowledgeRequest);
        }
    }

    private String getSubscriptionName(ProcessContext context) {
        String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
        String projectId = context.getProperty(PROJECT_ID).getValue();
        if (subscriptionName.contains("/")) {
            return ProjectSubscriptionName.parse((String)subscriptionName).toString();
        }
        return ProjectSubscriptionName.of((String)projectId, (String)subscriptionName).toString();
    }

    private SubscriberStub getSubscriber(ProcessContext context) throws IOException {
        SubscriberStubSettings subscriberStubSettings = ((SubscriberStubSettings.Builder)SubscriberStubSettings.newBuilder().setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)this.getGoogleCredentials(context)))).build();
        return GrpcSubscriberStub.create((SubscriberStubSettings)subscriberStubSettings);
    }
}

