/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.googlepubsub.sink;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.query.api.definition.StreamDefinition;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

@Extension(name="googlepubsub", namespace="sink", description="The GooglePubSub sink publishes messages to a topic in the GooglePubSub server. If the required topic does not exist, GooglePubSub Sink creates the topic and publishes messages to it.", parameters={@Parameter(name="project.id", description="The unique ID of the GCP console project within which the topic is created.", type={DataType.STRING}), @Parameter(name="topic.id", description="The ID of the topic to which the messages that are processed by Siddhi are published. ", type={DataType.STRING}), @Parameter(name="credential.path", description="The file path of the service account credentials.", type={DataType.STRING})}, examples={@Example(syntax="@sink(type = 'googlepubsub', @map(type= 'text'),\nproject.id = 'sp-path-1547649404768', \ncredential.path = 'src/test/resources/security/sp.json',\ntopic.id ='topicA',\n )\ndefine stream InputStream(message string);", description="This query publishes messages to a topic in the GooglePubSub server. Here, the messages are published to'topicA' topic in the 'sp-path-1547649404768' project. If the 'topicA' topic already exists in the 'sp-path-1547649404768' project, messages are directly published to that topic. If it does not exist, a topic with that ID is newly created in the project and then, the messages are published to that topic.")})
public class GooglePubSubSink
extends Sink<State> {
    private static final Logger log = Logger.getLogger(GooglePubSubSink.class);
    private String streamID;
    private String siddhiAppName;
    private GoogleCredentials credentials;
    private TopicAdminClient topicAdminClient;
    private String projectId;
    private ProjectTopicName topic;
    private Publisher publisher;

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{"topic.id", "project.id", "credential.path"};
    }

    protected StateFactory<State> init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppName = siddhiAppContext.getName();
        this.streamID = streamDefinition.getId();
        String topicId = optionHolder.validateAndGetStaticValue("topic.id");
        this.projectId = optionHolder.validateAndGetStaticValue("project.id");
        String credentialPath = optionHolder.validateAndGetStaticValue("credential.path");
        this.topic = ProjectTopicName.of(this.projectId, topicId);
        File credentialsPath = new File(credentialPath);
        try {
            FileInputStream serviceAccountStream = new FileInputStream(credentialsPath);
            this.credentials = ServiceAccountCredentials.fromStream(serviceAccountStream);
        }
        catch (IOException e) {
            throw new SiddhiAppCreationException("The file that contains your service account credentials is not found or you are not permitted to make authenticated calls. Check the credential.path '" + credentialPath + "' defined in stream " + this.siddhiAppName + " : " + this.streamID + ".", (Throwable)e);
        }
        return null;
    }

    public void publish(Object o, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        String message = (String)o;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
        this.publisher.publish(pubsubMessage);
    }

    public void connect() throws ConnectionUnavailableException {
        this.createTopic();
        try {
            this.publisher = Publisher.newBuilder(this.topic).setCredentialsProvider(FixedCredentialsProvider.create(this.credentials)).build();
        }
        catch (IOException e) {
            throw new ConnectionUnavailableException("Could not create a publisher bound to the topic : " + this.topic, (Throwable)e);
        }
    }

    public void disconnect() {
        if (this.publisher != null) {
            try {
                this.publisher.shutdown();
                this.publisher.awaitTermination(1L, TimeUnit.MINUTES);
            }
            catch (Exception e) {
                log.error((Object)String.format("Error in shutting down the publisher %s. Message %s.", this.publisher, e.getMessage()));
            }
        }
    }

    public void destroy() {
    }

    private void createTopic() {
        block9: {
            try {
                TopicAdminSettings topicAdminSettings = ((TopicAdminSettings.Builder)TopicAdminSettings.newBuilder().setCredentialsProvider(FixedCredentialsProvider.create(this.credentials))).build();
                this.topicAdminClient = TopicAdminClient.create(topicAdminSettings);
                this.topicAdminClient.createTopic(this.topic);
            }
            catch (ApiException e) {
                if (e.getStatusCode().getCode() == StatusCode.Code.ALREADY_EXISTS) {
                    log.info((Object)("You have the topic '" + this.topic + "' in google pub sub server."));
                    break block9;
                }
                throw new SiddhiAppRuntimeException("An error is caused due to a resource " + (Object)((Object)e.getStatusCode().getCode()) + " in Google Pub Sub server. Check whether you have provided a proper project.id for '" + this.projectId + "' defined in stream " + this.siddhiAppName + ": " + this.streamID + " and make sure you have enough access to use all resources in API.", (Throwable)e);
            }
            catch (IOException e) {
                throw new SiddhiAppRuntimeException("Could not create the topic " + this.topic + "in the google pub sub server, under the project.id '" + this.projectId + "' defined in stream " + this.siddhiAppName + " : " + this.streamID + ".", (Throwable)e);
            }
            finally {
                if (this.topicAdminClient != null) {
                    this.topicAdminClient.shutdown();
                }
            }
        }
    }
}

