/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.pubsub;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
import org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub;

@SeeAlso(value={ConsumeGCPubSub.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"google", "google-cloud", "gcp", "message", "pubsub", "publish"})
@CapabilityDescription(value="Publishes the content of the incoming flowfile to the configured Google Cloud PubSub topic. The processor supports dynamic properties. If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'.")
@DynamicProperty(name="Attribute name", value="Value to be set to the attribute", description="Attributes to be set for the outgoing Google Cloud PubSub message", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@WritesAttributes(value={@WritesAttribute(attribute="gcp.pubsub.messageId", description="ID of the pubsub message published to the configured Google Cloud PubSub topic"), @WritesAttribute(attribute="gcp.pubsub.topic", description="Name of the Google Cloud PubSub topic the message was published to")})
public class PublishGCPubSub
extends AbstractGCPubSubProcessor {
    public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder().name("gcp-pubsub-topic").displayName("Topic Name").description("Name of the Google Cloud PubSub Topic").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.").build();
    private Publisher publisher = null;
    private AtomicReference<Exception> storedException = new AtomicReference();

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return ImmutableList.of((Object)PROJECT_ID, (Object)GCP_CREDENTIALS_PROVIDER_SERVICE, (Object)TOPIC_NAME, (Object)BATCH_SIZE);
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().required(false).name(propertyDescriptorName).displayName(propertyDescriptorName).addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR).dynamic(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    }

    @Override
    public Set<Relationship> getRelationships() {
        return Collections.unmodifiableSet(new HashSet<Relationship>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_RETRY)));
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        try {
            this.publisher = this.getPublisherBuilder(context).build();
        }
        catch (IOException e) {
            this.getLogger().error("Failed to create Google Cloud PubSub Publisher due to {}", new Object[]{e});
            this.storedException.set(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
        List flowFiles = session.get(flowFileCount);
        if (flowFiles.isEmpty() || this.publisher == null) {
            if (this.storedException.get() != null) {
                this.getLogger().error("Google Cloud PubSub Publisher was not properly created due to {}", new Object[]{this.storedException.get()});
            }
            context.yield();
            return;
        }
        long startNanos = System.nanoTime();
        ArrayList<FlowFile> successfulFlowFiles = new ArrayList<FlowFile>();
        String topicName = this.getTopicName(context).toString();
        try {
            for (FlowFile flowFile : flowFiles) {
                try {
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    session.exportTo(flowFile, (OutputStream)baos);
                    ByteString flowFileContent = ByteString.copyFromUtf8((String)baos.toString());
                    PubsubMessage message = PubsubMessage.newBuilder().setData(flowFileContent).setPublishTime(Timestamp.newBuilder().build()).putAllAttributes(this.getDynamicAttributesMap(context, flowFile)).build();
                    ApiFuture messageIdFuture = this.publisher.publish(message);
                    while (messageIdFuture.isDone()) {
                        Thread.sleep(500L);
                    }
                    String messageId = (String)messageIdFuture.get();
                    HashMap<String, String> attributes = new HashMap<String, String>();
                    attributes.put("gcp.pubsub.messageId", messageId);
                    attributes.put("gcp.pubsub.topic", topicName);
                    flowFile = session.putAllAttributes(flowFile, attributes);
                    successfulFlowFiles.add(flowFile);
                }
                catch (InterruptedException | ExecutionException e) {
                    if (e.getCause() instanceof DeadlineExceededException) {
                        this.getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {} but attempting again may succeed so routing to retry", new Object[]{topicName, e.getLocalizedMessage()}, (Throwable)e);
                        session.transfer(flowFile, REL_RETRY);
                        continue;
                    }
                    this.getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {}", new Object[]{topicName, e});
                    session.transfer(flowFile, REL_FAILURE);
                    context.yield();
                }
            }
        }
        finally {
            if (!successfulFlowFiles.isEmpty()) {
                session.transfer(successfulFlowFiles, REL_SUCCESS);
                for (FlowFile flowFile : successfulFlowFiles) {
                    long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                    session.getProvenanceReporter().send(flowFile, topicName, transmissionMillis);
                }
            }
        }
    }

    @OnStopped
    public void onStopped() {
        this.shutdownPublisher();
    }

    private void shutdownPublisher() {
        try {
            if (this.publisher != null) {
                this.publisher.shutdown();
            }
        }
        catch (Exception e) {
            this.getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Publisher due to {}", new Object[]{e});
        }
    }

    private ProjectTopicName getTopicName(ProcessContext context) {
        String topic = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
        String projectId = context.getProperty(PROJECT_ID).getValue();
        if (topic.contains("/")) {
            return ProjectTopicName.parse((String)topic);
        }
        return ProjectTopicName.of((String)projectId, (String)topic);
    }

    private Map<String, String> getDynamicAttributesMap(ProcessContext context, FlowFile flowFile) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        for (Map.Entry entry : context.getProperties().entrySet()) {
            if (!((PropertyDescriptor)entry.getKey()).isDynamic()) continue;
            String value = context.getProperty((PropertyDescriptor)entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
            attributes.put(((PropertyDescriptor)entry.getKey()).getName(), value);
        }
        return attributes;
    }

    private Publisher.Builder getPublisherBuilder(ProcessContext context) {
        Long batchSize = context.getProperty(BATCH_SIZE).asLong();
        return Publisher.newBuilder((TopicName)this.getTopicName(context)).setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)this.getGoogleCredentials(context))).setBatchingSettings(BatchingSettings.newBuilder().setElementCountThreshold(batchSize).setIsEnabled(Boolean.valueOf(true)).build());
    }
}

