/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.googlepubsub.source;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PushConfig;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.googlepubsub.source.GooglePubSubMessageReceiver;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.log4j.Logger;

@Extension(name="googlepubsub", namespace="source", description="The GooglePubSub source receives events to be processed by Siddhi from a topic in a GooglePubSub server. Here, a subscriber client creates a subscription to that topic and consumes messages via the subscription. The subscription applications receive only the messages that are published after the subscription is created. A subscription connects a topic to a subscriber application, enabling the application to receive and process messages from that topic. A topic can have multiple subscriptions, but a given subscription belongs only to a single topic.", parameters={@Parameter(name="project.id", description="The unique ID of the GCP console project within which the topic is created.", type={DataType.STRING}), @Parameter(name="topic.id", description="The unique ID of the topic from which the messages are received.", type={DataType.STRING}), @Parameter(name="subscription.id", description="The unique ID of the subscription from which messages must be retrieved.", type={DataType.STRING}), @Parameter(name="credential.path", description="The file path of the service account credentials.", type={DataType.STRING})}, examples={@Example(syntax="@source(type='googlepubsub',@map(type='text'),\ntopic.id='topicA',\nproject.id='sp-path-1547649404768',\ncredential.path = 'src/test/resources/security/sp.json',\nsubscription.id='subA',\n)\ndefine stream OutputStream(message String);", description="This query shows how to subscribe to a googlepubsub topic. Here, a googlepubsub source subscribes to the 'topicA' topic that resides in the 'sp-path-1547649404768' project within a googlepubsub instance. The events are received in the text format, mapped to a Siddhi event, and then sent to a stream named OutputStream.")})
public class GooglePubSubSource
extends Source<State> {
    private static final Logger log = Logger.getLogger(GooglePubSubSource.class);
    private String streamID;
    private String siddhiAppName;
    private SubscriptionAdminClient subscriptionAdminClient;
    private GooglePubSubMessageReceiver googlePubSubMessageReceiver;
    private Subscriber subscriber;
    private ProjectTopicName topicName;
    private ProjectSubscriptionName subscriptionName;
    private GoogleCredentials credentials;
    private String projectId;
    private String topicId;

    public StateFactory<State> init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.streamID = sourceEventListener.getStreamDefinition().getId();
        this.siddhiAppName = siddhiAppContext.getName();
        String subscriptionId = optionHolder.validateAndGetStaticValue("subscription.id");
        this.topicId = optionHolder.validateAndGetStaticValue("topic.id");
        this.projectId = optionHolder.validateAndGetStaticValue("project.id");
        String credentialPath = optionHolder.validateAndGetStaticValue("credential.path");
        this.googlePubSubMessageReceiver = new GooglePubSubMessageReceiver(sourceEventListener);
        this.topicName = ProjectTopicName.of(this.projectId, this.topicId);
        this.subscriptionName = ProjectSubscriptionName.of(this.projectId, subscriptionId);
        File credentialsPath = new File(credentialPath);
        try {
            FileInputStream serviceAccountStream = new FileInputStream(credentialsPath);
            this.credentials = ServiceAccountCredentials.fromStream(serviceAccountStream);
        }
        catch (IOException e) {
            throw new SiddhiAppCreationException("The file that contains your service account credentials is not found or you are not permitted to make authenticated calls.Check the credential.path '" + credentialPath + "' defined in stream " + this.siddhiAppName + ": " + this.streamID, (Throwable)e);
        }
        return null;
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        block9: {
            try {
                SubscriptionAdminSettings subscriptionAdminSettings = ((SubscriptionAdminSettings.Builder)SubscriptionAdminSettings.newBuilder().setCredentialsProvider(FixedCredentialsProvider.create(this.credentials))).build();
                this.subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings);
                this.subscriptionAdminClient.createSubscription(this.subscriptionName, this.topicName, PushConfig.getDefaultInstance(), 10);
            }
            catch (ApiException e) {
                if (e.getStatusCode().getCode() == StatusCode.Code.ALREADY_EXISTS) {
                    log.info((Object)("You have a subscription " + this.subscriptionName + "to the topic " + this.topicName));
                    break block9;
                }
                log.error((Object)("Error in connecting to the resources at " + this.siddhiAppName + ": " + this.streamID));
                throw new ConnectionUnavailableException("An error is caused due to resource " + (Object)((Object)e.getStatusCode().getCode()) + ".Check whether you have provided a proper project.id for '" + this.projectId + "' or existing topic.id for '" + this.topicId + "' defined in stream " + this.siddhiAppName + ": " + this.streamID + " and make sure you have enough access to use all resources in API.", (Throwable)e);
            }
            catch (IOException e) {
                throw new ConnectionUnavailableException("Could not create a subscription " + this.subscriptionName + "to pull messages from the google pub sub server defined in stream " + this.siddhiAppName + ": " + this.streamID, (Throwable)e);
            }
            finally {
                if (this.subscriptionAdminClient != null) {
                    this.subscriptionAdminClient.shutdown();
                }
            }
        }
        this.subscriber = Subscriber.newBuilder(this.subscriptionName, (MessageReceiver)this.googlePubSubMessageReceiver).setCredentialsProvider(FixedCredentialsProvider.create(this.credentials)).build();
        this.subscriber.startAsync().awaitRunning();
    }

    public void disconnect() {
        if (this.subscriber != null) {
            this.subscriber.stopAsync().awaitTerminated();
        }
    }

    public void destroy() {
    }

    public void pause() {
        this.googlePubSubMessageReceiver.pause();
    }

    public void resume() {
        this.googlePubSubMessageReceiver.resume();
    }
}

