package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.core.FlowController;
import com.google.api.stats.Distribution;
import com.google.auth.Credentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriberGrpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ClientResponseObserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.joda.time.Duration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.class */
public final class StreamingSubscriberConnection extends AbstractService implements MessageDispatcher.AckProcessor {
    private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName());
    private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = new Duration(100);
    private static final int MAX_PER_REQUEST_CHANGES = 10000;
    private Duration channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
    private final Channel channel;
    private final Credentials credentials;
    private final String subscription;
    private final ScheduledExecutorService executor;
    private final MessageDispatcher messageDispatcher;
    private ClientCallStreamObserver<StreamingPullRequest> requestObserver;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection$StreamingPullResponseObserver.class */
    public class StreamingPullResponseObserver implements ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> {
        final SettableFuture<Void> errorFuture;

        StreamingPullResponseObserver(SettableFuture<Void> settableFuture) {
            this.errorFuture = settableFuture;
        }

        public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> clientCallStreamObserver) {
            StreamingSubscriberConnection.this.requestObserver = clientCallStreamObserver;
            clientCallStreamObserver.disableAutoInboundFlowControl();
        }

        public void onNext(StreamingPullResponse streamingPullResponse) {
            StreamingSubscriberConnection.this.messageDispatcher.processReceivedMessages(streamingPullResponse.getReceivedMessagesList());
            if (StreamingSubscriberConnection.this.isAlive()) {
                StreamingSubscriberConnection.this.requestObserver.request(1);
            }
        }

        public void onError(Throwable th) {
            StreamingSubscriberConnection.logger.log(Level.WARNING, "Terminated streaming with exception", th);
            this.errorFuture.setException(th);
        }

        public void onCompleted() {
            StreamingSubscriberConnection.logger.fine("Streaming pull terminated successfully!");
            this.errorFuture.set((Object) null);
        }
    }

    public StreamingSubscriberConnection(String str, Credentials credentials, MessageReceiver messageReceiver, Duration duration, int i, Distribution distribution, Channel channel, FlowController flowController, ScheduledExecutorService scheduledExecutorService, Clock clock) {
        this.subscription = str;
        this.executor = scheduledExecutorService;
        this.credentials = credentials;
        this.channel = channel;
        this.messageDispatcher = new MessageDispatcher(messageReceiver, this, duration, distribution, flowController, scheduledExecutorService, clock);
        this.messageDispatcher.setMessageDeadlineSeconds(i);
    }

    protected void doStart() {
        logger.config("Starting subscriber.");
        initialize();
        notifyStarted();
    }

    protected void doStop() {
        this.messageDispatcher.stop();
        notifyStopped();
        this.requestObserver.onError(Status.CANCELLED.asException());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initialize() {
        SettableFuture create = SettableFuture.create();
        ClientCallStreamObserver asyncBidiStreamingCall = ClientCalls.asyncBidiStreamingCall(this.channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(this.credentials))), new StreamingPullResponseObserver(create));
        logger.log(Level.FINER, "Initializing stream to subscription {0} with deadline {1}", new Object[]{this.subscription, Integer.valueOf(this.messageDispatcher.getMessageDeadlineSeconds())});
        asyncBidiStreamingCall.onNext(StreamingPullRequest.newBuilder().setSubscription(this.subscription).setStreamAckDeadlineSeconds(this.messageDispatcher.getMessageDeadlineSeconds()).build());
        asyncBidiStreamingCall.request(1);
        Futures.addCallback(create, new FutureCallback<Void>() { // from class: com.google.cloud.pubsub.spi.v1.StreamingSubscriberConnection.1
            public void onSuccess(@Nullable Void r4) {
                StreamingSubscriberConnection.this.channelReconnectBackoff = StreamingSubscriberConnection.INITIAL_CHANNEL_RECONNECT_BACKOFF;
                StreamingSubscriberConnection.this.initialize();
            }

            public void onFailure(Throwable th) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    StreamingSubscriberConnection.logger.log(Level.FINE, "pull failure after service no longer running", th);
                    return;
                }
                if (!StatusUtil.isRetryable(th)) {
                    StreamingSubscriberConnection.this.notifyFailed(th);
                    return;
                }
                long millis = StreamingSubscriberConnection.this.channelReconnectBackoff.getMillis();
                StreamingSubscriberConnection.this.channelReconnectBackoff = StreamingSubscriberConnection.this.channelReconnectBackoff.plus(millis);
                StreamingSubscriberConnection.this.executor.schedule(new Runnable() { // from class: com.google.cloud.pubsub.spi.v1.StreamingSubscriberConnection.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        StreamingSubscriberConnection.this.initialize();
                    }
                }, millis, TimeUnit.MILLISECONDS);
            }
        }, this.executor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isAlive() {
        return state() == Service.State.RUNNING || state() == Service.State.STARTING;
    }

    @Override // com.google.cloud.pubsub.spi.v1.MessageDispatcher.AckProcessor
    public void sendAckOperations(List<String> list, List<MessageDispatcher.PendingModifyAckDeadline> list2) {
        Iterator<StreamingPullRequest> it = partitionAckOperations(list, list2, MAX_PER_REQUEST_CHANGES).iterator();
        while (it.hasNext()) {
            this.requestObserver.onNext(it.next());
        }
    }

    @VisibleForTesting
    static List<StreamingPullRequest> partitionAckOperations(List<String> list, List<MessageDispatcher.PendingModifyAckDeadline> list2, int i) {
        int i2 = 0;
        Iterator<MessageDispatcher.PendingModifyAckDeadline> it = list2.iterator();
        while (it.hasNext()) {
            i2 += it.next().ackIds.size();
        }
        int max = Math.max(i2, list.size());
        int i3 = (max / i) + (max % i == 0 ? 0 : 1);
        ArrayList arrayList = new ArrayList(i3);
        for (int i4 = 0; i4 < i3; i4++) {
            arrayList.add(StreamingPullRequest.newBuilder());
        }
        int i5 = 0;
        Iterator it2 = Lists.partition(list, i).iterator();
        while (it2.hasNext()) {
            ((StreamingPullRequest.Builder) arrayList.get(i5)).addAllAckIds((List) it2.next());
            i5++;
        }
        int i6 = 0;
        int i7 = 0;
        for (MessageDispatcher.PendingModifyAckDeadline pendingModifyAckDeadline : list2) {
            Iterator<String> it3 = pendingModifyAckDeadline.ackIds.iterator();
            while (it3.hasNext()) {
                ((StreamingPullRequest.Builder) arrayList.get(i6)).addModifyDeadlineSeconds(pendingModifyAckDeadline.deadlineExtensionSeconds).addModifyDeadlineAckIds(it3.next());
                i7++;
                if (i7 == i) {
                    i6++;
                    i7 = 0;
                }
            }
        }
        ArrayList arrayList2 = new ArrayList(arrayList.size());
        Iterator it4 = arrayList.iterator();
        while (it4.hasNext()) {
            arrayList2.add(((StreamingPullRequest.Builder) it4.next()).build());
        }
        return arrayList2;
    }

    public void updateStreamAckDeadline(int i) {
        this.messageDispatcher.setMessageDeadlineSeconds(i);
        this.requestObserver.onNext(StreamingPullRequest.newBuilder().setStreamAckDeadlineSeconds(i).build());
    }
}
