package org.springframework.cloud.gcp.pubsub.core;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.PublisherFactory;
import org.springframework.cloud.gcp.pubsub.support.SubscriberFactory;
import org.springframework.cloud.gcp.pubsub.support.converter.PubSubMessageConverter;
import org.springframework.cloud.gcp.pubsub.support.converter.SimplePubSubMessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

/* loaded from: input_file:org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.class */
public class PubSubTemplate implements PubSubOperations, InitializingBean {
    private static final Log LOGGER = LogFactory.getLog(PubSubTemplate.class);
    private PubSubMessageConverter messageConverter = new SimplePubSubMessageConverter();
    private final PublisherFactory publisherFactory;
    private final SubscriberFactory subscriberFactory;
    private final SubscriberStub subscriberStub;

    public PubSubTemplate(PublisherFactory publisherFactory, SubscriberFactory subscriberFactory) {
        this.publisherFactory = publisherFactory;
        this.subscriberFactory = subscriberFactory;
        this.subscriberStub = this.subscriberFactory.createSubscriberStub();
    }

    public PubSubMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(PubSubMessageConverter pubSubMessageConverter) {
        Assert.notNull(pubSubMessageConverter, "A valid Pub/Sub message converter is required.");
        this.messageConverter = pubSubMessageConverter;
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.PubSubOperations
    public <T> ListenableFuture<String> publish(String str, T t, Map<String, String> map) {
        return publish(str, this.messageConverter.toPubSubMessage(t, map));
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.PubSubOperations
    public <T> ListenableFuture<String> publish(String str, T t) {
        return publish(str, t, null);
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.PubSubOperations
    public ListenableFuture<String> publish(final String str, PubsubMessage pubsubMessage) {
        ApiFuture publish = this.publisherFactory.createPublisher(str).publish(pubsubMessage);
        final SettableListenableFuture settableListenableFuture = new SettableListenableFuture();
        ApiFutures.addCallback(publish, new ApiFutureCallback<String>() { // from class: org.springframework.cloud.gcp.pubsub.core.PubSubTemplate.1
            public void onFailure(Throwable th) {
                PubSubTemplate.LOGGER.warn("Publishing to " + str + " topic failed.", th);
                settableListenableFuture.setException(th);
            }

            public void onSuccess(String str2) {
                if (PubSubTemplate.LOGGER.isDebugEnabled()) {
                    PubSubTemplate.LOGGER.debug("Publishing to " + str + " was successful. Message ID: " + str2);
                }
                settableListenableFuture.set(str2);
            }
        });
        return settableListenableFuture;
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.PubSubOperations
    public Subscriber subscribe(String str, MessageReceiver messageReceiver) {
        Subscriber createSubscriber = this.subscriberFactory.createSubscriber(str, messageReceiver);
        createSubscriber.startAsync();
        return createSubscriber;
    }

    private List<AcknowledgeablePubsubMessage> pull(PullRequest pullRequest) {
        Assert.notNull(pullRequest, "The pull request cannot be null.");
        return (List) ((PullResponse) this.subscriberStub.pullCallable().call(pullRequest)).getReceivedMessagesList().stream().map(receivedMessage -> {
            return new AcknowledgeablePubsubMessage(receivedMessage.getMessage(), receivedMessage.getAckId(), pullRequest.getSubscription(), this.subscriberStub);
        }).collect(Collectors.toList());
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.PubSubOperations
    public List<AcknowledgeablePubsubMessage> pull(String str, Integer num, Boolean bool) {
        return pull(this.subscriberFactory.createPullRequest(str, num, bool));
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.PubSubOperations
    public List<PubsubMessage> pullAndAck(String str, Integer num, Boolean bool) {
        List<AcknowledgeablePubsubMessage> pull = pull(this.subscriberFactory.createPullRequest(str, num, bool));
        ack(pull);
        return (List) pull.stream().map((v0) -> {
            return v0.getPubsubMessage();
        }).collect(Collectors.toList());
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.PubSubOperations
    public PubsubMessage pullNext(String str) {
        List<PubsubMessage> pullAndAck = pullAndAck(str, 1, true);
        if (pullAndAck.size() > 0) {
            return pullAndAck.get(0);
        }
        return null;
    }

    public void afterPropertiesSet() throws Exception {
    }

    public PublisherFactory getPublisherFactory() {
        return this.publisherFactory;
    }

    public SubscriberFactory getSubscriberFactory() {
        return this.subscriberFactory;
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.PubSubOperations
    public void ack(Collection<AcknowledgeablePubsubMessage> collection) {
        Assert.notEmpty(collection, "The acknowledgeablePubsubMessages cannot be null.");
        groupAcknowledgeableMessages(collection).forEach((v1, v2) -> {
            ack(v1, v2);
        });
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.PubSubOperations
    public void nack(Collection<AcknowledgeablePubsubMessage> collection) {
        Assert.notEmpty(collection, "The acknowledgeablePubsubMessages cannot be null.");
        groupAcknowledgeableMessages(collection).forEach((v1, v2) -> {
            nack(v1, v2);
        });
    }

    private Map<String, List<String>> groupAcknowledgeableMessages(Collection<AcknowledgeablePubsubMessage> collection) {
        return (Map) collection.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getSubscriptionName();
        }, Collectors.mapping((v0) -> {
            return v0.getAckId();
        }, Collectors.toList())));
    }

    private void ack(String str, Collection<String> collection) {
        this.subscriberStub.acknowledgeCallable().call(AcknowledgeRequest.newBuilder().addAllAckIds(collection).setSubscription(str).build());
    }

    private void nack(String str, Collection<String> collection) {
        this.subscriberStub.modifyAckDeadlineCallable().call(ModifyAckDeadlineRequest.newBuilder().setAckDeadlineSeconds(0).addAllAckIds(collection).setSubscription(str).build());
    }
}
