/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsub.v1.MessageDispatcher;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.StatusUtil;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

final class StreamingSubscriberConnection
extends AbstractApiService
implements MessageDispatcher.AckProcessor {
    private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName());
    private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100L);
    private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10L);
    private static final int MAX_PER_REQUEST_CHANGES = 1000;
    private final SubscriberStub stub;
    private final int channelAffinity;
    private final String subscription;
    private final ScheduledExecutorService systemExecutor;
    private final MessageDispatcher messageDispatcher;
    private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
    private final Lock lock = new ReentrantLock();
    private ClientStream<StreamingPullRequest> clientStream;

    public StreamingSubscriberConnection(String subscription, MessageReceiver receiver, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, SubscriberStub stub, int channelAffinity, FlowController flowController, Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, ApiClock clock) {
        this.subscription = subscription;
        this.systemExecutor = systemExecutor;
        this.stub = stub;
        this.channelAffinity = channelAffinity;
        this.messageDispatcher = new MessageDispatcher(receiver, this, ackExpirationPadding, maxAckExtensionPeriod, ackLatencyDistribution, flowController, outstandingMessageBatches, executor, systemExecutor, clock);
    }

    @Override
    protected void doStart() {
        logger.config("Starting subscriber.");
        this.messageDispatcher.start();
        this.initialize();
        this.notifyStarted();
    }

    @Override
    protected void doStop() {
        this.messageDispatcher.stop();
        this.lock.lock();
        try {
            this.clientStream.closeSendWithError(Status.CANCELLED.asException());
        }
        finally {
            this.lock.unlock();
            this.notifyStopped();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initialize() {
        SettableApiFuture<Void> errorFuture = SettableApiFuture.create();
        StreamingPullResponseObserver responseObserver = new StreamingPullResponseObserver(errorFuture);
        ClientStream<StreamingPullRequest> initClientStream = this.stub.streamingPullCallable().splitCall(responseObserver, GrpcCallContext.createDefault().withChannelAffinity(this.channelAffinity));
        logger.log(Level.FINER, "Initializing stream to subscription {0}", this.subscription);
        initClientStream.send(StreamingPullRequest.newBuilder().setSubscription(this.subscription).setStreamAckDeadlineSeconds(60).build());
        this.lock.lock();
        try {
            this.clientStream = initClientStream;
        }
        finally {
            this.lock.unlock();
        }
        ApiFutures.addCallback(errorFuture, new ApiFutureCallback<Void>(){

            @Override
            public void onSuccess(@Nullable Void result) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    return;
                }
                StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
                StreamingSubscriberConnection.this.initialize();
            }

            @Override
            public void onFailure(Throwable cause) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    logger.log(Level.FINE, "pull failure after service no longer running", cause);
                    return;
                }
                if (!StatusUtil.isRetryable(cause)) {
                    ApiException gaxException = ApiExceptionFactory.createException(cause, GrpcStatusCode.of(Status.fromThrowable(cause).getCode()), false);
                    logger.log(Level.SEVERE, "terminated streaming with exception", gaxException);
                    StreamingSubscriberConnection.this.notifyFailed(gaxException);
                    return;
                }
                logger.log(Level.FINE, "stream closed with retryable exception; will reconnect", cause);
                long backoffMillis = StreamingSubscriberConnection.this.channelReconnectBackoffMillis.get();
                long newBackoffMillis = Math.min(backoffMillis * 2L, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
                StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(newBackoffMillis);
                StreamingSubscriberConnection.this.systemExecutor.schedule(new Runnable(){

                    @Override
                    public void run() {
                        StreamingSubscriberConnection.this.initialize();
                    }
                }, backoffMillis, TimeUnit.MILLISECONDS);
            }
        }, MoreExecutors.directExecutor());
    }

    private boolean isAlive() {
        ApiService.State state = this.state();
        return state == ApiService.State.RUNNING || state == ApiService.State.STARTING;
    }

    @Override
    public void sendAckOperations(List<String> acksToSend, List<MessageDispatcher.PendingModifyAckDeadline> ackDeadlineExtensions) {
        ApiFutureCallback<Empty> loggingCallback = new ApiFutureCallback<Empty>(){

            @Override
            public void onSuccess(Empty empty) {
            }

            @Override
            public void onFailure(Throwable t) {
                Level level = StreamingSubscriberConnection.this.isAlive() ? Level.WARNING : Level.FINER;
                logger.log(level, "failed to send operations", t);
            }
        };
        for (MessageDispatcher.PendingModifyAckDeadline pendingModifyAckDeadline : ackDeadlineExtensions) {
            for (List<String> idChunk : Lists.partition(pendingModifyAckDeadline.ackIds, 1000)) {
                ApiFuture<Empty> future = this.stub.modifyAckDeadlineCallable().futureCall(ModifyAckDeadlineRequest.newBuilder().setSubscription(this.subscription).addAllAckIds(idChunk).setAckDeadlineSeconds(pendingModifyAckDeadline.deadlineExtensionSeconds).build());
                ApiFutures.addCallback(future, loggingCallback);
            }
        }
        for (List list : Lists.partition(acksToSend, 1000)) {
            ApiFuture<Empty> future = this.stub.acknowledgeCallable().futureCall(AcknowledgeRequest.newBuilder().setSubscription(this.subscription).addAllAckIds(list).build());
            ApiFutures.addCallback(future, loggingCallback);
        }
    }

    @InternalApi
    static List<StreamingPullRequest> partitionAckOperations(List<String> acksToSend, List<MessageDispatcher.PendingModifyAckDeadline> ackDeadlineExtensions, int size) {
        int numExtensions = 0;
        for (MessageDispatcher.PendingModifyAckDeadline modify : ackDeadlineExtensions) {
            numExtensions += modify.ackIds.size();
        }
        int numChanges = Math.max(numExtensions, acksToSend.size());
        int numRequests = numChanges / size + (numChanges % size == 0 ? 0 : 1);
        ArrayList<StreamingPullRequest.Builder> requests = new ArrayList<StreamingPullRequest.Builder>(numRequests);
        for (int i = 0; i < numRequests; ++i) {
            requests.add(StreamingPullRequest.newBuilder());
        }
        int reqCount = 0;
        for (List<String> list : Lists.partition(acksToSend, size)) {
            ((StreamingPullRequest.Builder)requests.get(reqCount)).addAllAckIds(list);
            ++reqCount;
        }
        reqCount = 0;
        int ackCount = 0;
        for (MessageDispatcher.PendingModifyAckDeadline modify : ackDeadlineExtensions) {
            for (String ackId : modify.ackIds) {
                ((StreamingPullRequest.Builder)requests.get(reqCount)).addModifyDeadlineSeconds(modify.deadlineExtensionSeconds).addModifyDeadlineAckIds(ackId);
                if (++ackCount != size) continue;
                ++reqCount;
                ackCount = 0;
            }
        }
        ArrayList<StreamingPullRequest> arrayList = new ArrayList<StreamingPullRequest>(requests.size());
        for (StreamingPullRequest.Builder builder : requests) {
            arrayList.add(builder.build());
        }
        return arrayList;
    }

    private class StreamingPullResponseObserver
    implements ResponseObserver<StreamingPullResponse> {
        final SettableApiFuture<Void> errorFuture;
        StreamController thisController;

        StreamingPullResponseObserver(SettableApiFuture<Void> errorFuture) {
            this.errorFuture = errorFuture;
        }

        @Override
        public void onStart(StreamController controller) {
            this.thisController = controller;
            this.thisController.disableAutoInboundFlowControl();
            this.thisController.request(1);
        }

        @Override
        public void onResponse(StreamingPullResponse response) {
            StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
            StreamingSubscriberConnection.this.messageDispatcher.processReceivedMessages(response.getReceivedMessagesList(), new Runnable(){

                @Override
                public void run() {
                    if (StreamingSubscriberConnection.this.isAlive() && !StreamingPullResponseObserver.this.errorFuture.isDone()) {
                        StreamingSubscriberConnection.this.lock.lock();
                        try {
                            StreamingPullResponseObserver.this.thisController.request(1);
                        }
                        catch (Exception e) {
                            logger.log(Level.WARNING, "cannot request more messages", e);
                        }
                        finally {
                            StreamingSubscriberConnection.this.lock.unlock();
                        }
                    }
                }
            });
        }

        @Override
        public void onError(Throwable t) {
            this.errorFuture.setException(t);
        }

        @Override
        public void onComplete() {
            logger.fine("Streaming pull terminated successfully!");
            this.errorFuture.set(null);
        }
    }
}

