package com.google.cloud.pubsub.spi.v1;

import com.google.api.core.ApiClock;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/pubsub/spi/v1/MessageDispatcher.class */
public class MessageDispatcher {
    private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 600;
    private final ScheduledExecutorService executor;
    private final ScheduledExecutorService alarmsExecutor;
    private final ApiClock clock;
    private final Duration ackExpirationPadding;
    private final Duration maxAckExtensionPeriod;
    private final MessageReceiver receiver;
    private final AckProcessor ackProcessor;
    private final FlowController flowController;
    private final MessageWaiter messagesWaiter;
    private final PriorityQueue<ExtensionJob> outstandingAckHandlers;
    private final Set<String> pendingAcks;
    private final Set<String> pendingNacks;
    private final Lock alarmsLock;
    private int messageDeadlineSeconds;
    private ScheduledFuture<?> ackDeadlineExtensionAlarm;
    private Instant nextAckDeadlineExtensionAlarmTime;
    private ScheduledFuture<?> pendingAcksAlarm;
    private final Deque<OutstandingMessagesBatch> outstandingMessageBatches;
    private final Distribution ackLatencyDistribution;
    private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName());

    @VisibleForTesting
    static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
    private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2;
    private static final ScheduledExecutorService SHARED_ALARMS_EXECUTOR = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(INITIAL_ACK_DEADLINE_EXTENSION_SECONDS).build().getExecutor();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.google.cloud.pubsub.spi.v1.MessageDispatcher$4, reason: invalid class name */
    /* loaded from: input_file:com/google/cloud/pubsub/spi/v1/MessageDispatcher$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$com$google$cloud$pubsub$spi$v1$MessageDispatcher$AckReply = new int[AckReply.values().length];

        static {
            try {
                $SwitchMap$com$google$cloud$pubsub$spi$v1$MessageDispatcher$AckReply[AckReply.ACK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$google$cloud$pubsub$spi$v1$MessageDispatcher$AckReply[AckReply.NACK.ordinal()] = MessageDispatcher.INITIAL_ACK_DEADLINE_EXTENSION_SECONDS;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsub/spi/v1/MessageDispatcher$AckDeadlineAlarm.class */
    public class AckDeadlineAlarm implements Runnable {
        private AckDeadlineAlarm() {
        }

        @Override // java.lang.Runnable
        public void run() {
            MessageDispatcher.this.alarmsLock.lock();
            try {
                MessageDispatcher.this.nextAckDeadlineExtensionAlarmTime = Instant.ofEpochMilli(Long.MAX_VALUE);
                MessageDispatcher.this.ackDeadlineExtensionAlarm = null;
                if (MessageDispatcher.this.pendingAcksAlarm != null) {
                    MessageDispatcher.this.pendingAcksAlarm.cancel(false);
                    MessageDispatcher.this.pendingAcksAlarm = null;
                }
                Instant ofEpochMilli = Instant.ofEpochMilli(MessageDispatcher.this.clock.millisTime());
                Instant ofEpochMilli2 = Instant.ofEpochMilli(((long) Math.ceil(ofEpochMilli.plus(MessageDispatcher.this.ackExpirationPadding).plusMillis(500L).toEpochMilli() / 1000.0d)) * 1000);
                MessageDispatcher.logger.log(Level.FINER, "Running alarm sent outstanding acks, at time: {0}, with cutover time: {1}, padding: {2}", new Object[]{ofEpochMilli, ofEpochMilli2, MessageDispatcher.this.ackExpirationPadding});
                Instant instant = null;
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                synchronized (MessageDispatcher.this.outstandingAckHandlers) {
                    while (!MessageDispatcher.this.outstandingAckHandlers.isEmpty() && ((ExtensionJob) MessageDispatcher.this.outstandingAckHandlers.peek()).expiration.compareTo(ofEpochMilli2) <= 0) {
                        ExtensionJob extensionJob = (ExtensionJob) MessageDispatcher.this.outstandingAckHandlers.poll();
                        if (MessageDispatcher.this.maxAckExtensionPeriod.toMillis() <= 0 || extensionJob.creation.plus(MessageDispatcher.this.maxAckExtensionPeriod).compareTo(ofEpochMilli) > 0) {
                            int i = 0;
                            while (i < extensionJob.ackHandlers.size()) {
                                if (extensionJob.ackHandlers.get(i).acked.get()) {
                                    Collections.swap(extensionJob.ackHandlers, i, extensionJob.ackHandlers.size() - 1);
                                    extensionJob.ackHandlers.remove(extensionJob.ackHandlers.size() - 1);
                                } else {
                                    i++;
                                }
                            }
                            if (!extensionJob.ackHandlers.isEmpty()) {
                                extensionJob.extendExpiration(ofEpochMilli);
                                PendingModifyAckDeadline pendingModifyAckDeadline = new PendingModifyAckDeadline(Ints.saturatedCast(TimeUnit.MILLISECONDS.toSeconds(Duration.between(ofEpochMilli, extensionJob.expiration).toMillis())), new String[0]);
                                Iterator<AckHandler> it = extensionJob.ackHandlers.iterator();
                                while (it.hasNext()) {
                                    pendingModifyAckDeadline.addAckId(it.next().ackId);
                                }
                                arrayList.add(pendingModifyAckDeadline);
                                arrayList2.add(extensionJob);
                            }
                        }
                    }
                    Iterator it2 = arrayList2.iterator();
                    while (it2.hasNext()) {
                        MessageDispatcher.this.outstandingAckHandlers.add((ExtensionJob) it2.next());
                    }
                    if (!MessageDispatcher.this.outstandingAckHandlers.isEmpty()) {
                        instant = ((ExtensionJob) MessageDispatcher.this.outstandingAckHandlers.peek()).expiration;
                    }
                }
                MessageDispatcher.this.processOutstandingAckOperations(arrayList);
                if (instant != null) {
                    MessageDispatcher.logger.log(Level.FINER, "Scheduling based on outstanding, at time: {0}, next scheduled time: {1}", new Object[]{ofEpochMilli, instant});
                    MessageDispatcher.this.setupNextAckDeadlineExtensionAlarm(instant);
                }
            } finally {
                MessageDispatcher.this.alarmsLock.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsub/spi/v1/MessageDispatcher$AckHandler.class */
    public class AckHandler implements FutureCallback<AckReply> {
        private final String ackId;
        private final int outstandingBytes;
        private final AtomicBoolean acked = new AtomicBoolean(false);
        private final Instant receivedTime;

        AckHandler(String str, int i) {
            this.ackId = str;
            this.outstandingBytes = i;
            this.receivedTime = Instant.ofEpochMilli(MessageDispatcher.this.clock.millisTime());
        }

        public void onFailure(Throwable th) {
            MessageDispatcher.logger.log(Level.WARNING, "MessageReceiver failed to processes ack ID: " + this.ackId + ", the message will be nacked.", th);
            this.acked.getAndSet(true);
            synchronized (MessageDispatcher.this.pendingNacks) {
                MessageDispatcher.this.pendingNacks.add(this.ackId);
            }
            MessageDispatcher.this.setupPendingAcksAlarm();
            MessageDispatcher.this.flowController.release(1, this.outstandingBytes);
            MessageDispatcher.this.messagesWaiter.incrementPendingMessages(-1);
            MessageDispatcher.this.processOutstandingBatches();
        }

        public void onSuccess(AckReply ackReply) {
            this.acked.getAndSet(true);
            switch (AnonymousClass4.$SwitchMap$com$google$cloud$pubsub$spi$v1$MessageDispatcher$AckReply[ackReply.ordinal()]) {
                case 1:
                    synchronized (MessageDispatcher.this.pendingAcks) {
                        MessageDispatcher.this.pendingAcks.add(this.ackId);
                    }
                    MessageDispatcher.this.ackLatencyDistribution.record(Ints.saturatedCast((long) Math.ceil((MessageDispatcher.this.clock.millisTime() - TimeUnit.NANOSECONDS.toMillis(this.receivedTime.getNano())) / 1000.0d)));
                    break;
                case MessageDispatcher.INITIAL_ACK_DEADLINE_EXTENSION_SECONDS /* 2 */:
                    synchronized (MessageDispatcher.this.pendingNacks) {
                        MessageDispatcher.this.pendingNacks.add(this.ackId);
                    }
                    break;
                default:
                    throw new IllegalArgumentException(String.format("AckReply: %s not supported", ackReply));
            }
            MessageDispatcher.this.setupPendingAcksAlarm();
            MessageDispatcher.this.flowController.release(1, this.outstandingBytes);
            MessageDispatcher.this.messagesWaiter.incrementPendingMessages(-1);
            MessageDispatcher.this.processOutstandingBatches();
        }
    }

    /* loaded from: input_file:com/google/cloud/pubsub/spi/v1/MessageDispatcher$AckProcessor.class */
    public interface AckProcessor {
        void sendAckOperations(List<String> list, List<PendingModifyAckDeadline> list2);
    }

    /* loaded from: input_file:com/google/cloud/pubsub/spi/v1/MessageDispatcher$AckReply.class */
    public enum AckReply {
        ACK,
        NACK
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsub/spi/v1/MessageDispatcher$ExtensionJob.class */
    public class ExtensionJob implements Comparable<ExtensionJob> {
        Instant creation;
        Instant expiration;
        int nextExtensionSeconds;
        ArrayList<AckHandler> ackHandlers;

        ExtensionJob(Instant instant, Instant instant2, int i, ArrayList<AckHandler> arrayList) {
            this.creation = instant;
            this.expiration = instant2;
            this.nextExtensionSeconds = i;
            this.ackHandlers = arrayList;
        }

        void extendExpiration(Instant instant) {
            Instant plus = instant.plus(Duration.ofSeconds(this.nextExtensionSeconds));
            Instant plus2 = this.creation.plus(MessageDispatcher.this.maxAckExtensionPeriod);
            this.expiration = plus.isBefore(plus2) ? plus : plus2;
            this.nextExtensionSeconds = Math.min(MessageDispatcher.INITIAL_ACK_DEADLINE_EXTENSION_SECONDS * this.nextExtensionSeconds, MessageDispatcher.MAX_ACK_DEADLINE_EXTENSION_SECS);
        }

        @Override // java.lang.Comparable
        public int compareTo(ExtensionJob extensionJob) {
            return this.expiration.compareTo(extensionJob.expiration);
        }

        public String toString() {
            ArrayList arrayList = new ArrayList();
            Iterator<AckHandler> it = this.ackHandlers.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().ackId);
            }
            return String.format("ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}", this.expiration, Integer.valueOf(this.nextExtensionSeconds), arrayList);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/pubsub/spi/v1/MessageDispatcher$OutstandingMessagesBatch.class */
    public static class OutstandingMessagesBatch {
        private final Deque<OutstandingMessage> messages = new LinkedList();
        private final Runnable doneCallback;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:com/google/cloud/pubsub/spi/v1/MessageDispatcher$OutstandingMessagesBatch$OutstandingMessage.class */
        public static class OutstandingMessage {
            private final ReceivedMessage receivedMessage;
            private final AckHandler ackHandler;

            public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
                this.receivedMessage = receivedMessage;
                this.ackHandler = ackHandler;
            }

            public ReceivedMessage receivedMessage() {
                return this.receivedMessage;
            }

            public AckHandler ackHandler() {
                return this.ackHandler;
            }
        }

        public OutstandingMessagesBatch(Runnable runnable) {
            this.doneCallback = runnable;
        }

        public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
            this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
        }

        public Deque<OutstandingMessage> messages() {
            return this.messages;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/pubsub/spi/v1/MessageDispatcher$PendingModifyAckDeadline.class */
    public static class PendingModifyAckDeadline {
        final List<String> ackIds = new ArrayList();
        final int deadlineExtensionSeconds;

        PendingModifyAckDeadline(int i, String... strArr) {
            this.deadlineExtensionSeconds = i;
            for (String str : strArr) {
                addAckId(str);
            }
        }

        public void addAckId(String str) {
            this.ackIds.add(str);
        }

        public String toString() {
            return String.format("PendingModifyAckDeadline{extension: %d sec, ackIds: %s}", Integer.valueOf(this.deadlineExtensionSeconds), this.ackIds);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageDispatcher(MessageReceiver messageReceiver, AckProcessor ackProcessor, Duration duration, Duration duration2, Distribution distribution, FlowController flowController, ScheduledExecutorService scheduledExecutorService, @Nullable ScheduledExecutorService scheduledExecutorService2, ApiClock apiClock) {
        this.executor = scheduledExecutorService;
        this.alarmsExecutor = scheduledExecutorService2 == null ? SHARED_ALARMS_EXECUTOR : scheduledExecutorService2;
        this.ackExpirationPadding = duration;
        this.maxAckExtensionPeriod = duration2;
        this.receiver = messageReceiver;
        this.ackProcessor = ackProcessor;
        this.flowController = flowController;
        this.outstandingMessageBatches = new LinkedList();
        this.outstandingAckHandlers = new PriorityQueue<>();
        this.pendingAcks = new HashSet();
        this.pendingNacks = new HashSet();
        this.ackLatencyDistribution = distribution;
        this.alarmsLock = new ReentrantLock();
        this.nextAckDeadlineExtensionAlarmTime = Instant.ofEpochMilli(Long.MAX_VALUE);
        this.messagesWaiter = new MessageWaiter();
        this.clock = apiClock;
    }

    public void stop() {
        this.messagesWaiter.waitNoMessages();
        this.alarmsLock.lock();
        try {
            if (this.ackDeadlineExtensionAlarm != null) {
                this.ackDeadlineExtensionAlarm.cancel(true);
                this.ackDeadlineExtensionAlarm = null;
            }
            processOutstandingAckOperations();
        } finally {
            this.alarmsLock.unlock();
        }
    }

    public void setMessageDeadlineSeconds(int i) {
        this.messageDeadlineSeconds = i;
    }

    public int getMessageDeadlineSeconds() {
        return this.messageDeadlineSeconds;
    }

    public void processReceivedMessages(List<ReceivedMessage> list, Runnable runnable) {
        if (list.isEmpty()) {
            runnable.run();
            return;
        }
        this.messagesWaiter.incrementPendingMessages(list.size());
        OutstandingMessagesBatch outstandingMessagesBatch = new OutstandingMessagesBatch(runnable);
        ArrayList arrayList = new ArrayList(list.size());
        for (ReceivedMessage receivedMessage : list) {
            AckHandler ackHandler = new AckHandler(receivedMessage.getAckId(), receivedMessage.getMessage().getSerializedSize());
            arrayList.add(ackHandler);
            outstandingMessagesBatch.addMessage(receivedMessage, ackHandler);
        }
        Instant plusSeconds = Instant.ofEpochMilli(this.clock.millisTime()).plusSeconds(this.messageDeadlineSeconds);
        synchronized (this.outstandingAckHandlers) {
            this.outstandingAckHandlers.add(new ExtensionJob(Instant.ofEpochMilli(this.clock.millisTime()), plusSeconds, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, arrayList));
        }
        setupNextAckDeadlineExtensionAlarm(plusSeconds);
        synchronized (this.outstandingMessageBatches) {
            this.outstandingMessageBatches.add(outstandingMessagesBatch);
        }
        processOutstandingBatches();
    }

    public void processOutstandingBatches() {
        OutstandingMessagesBatch.OutstandingMessage outstandingMessage;
        boolean isEmpty;
        while (true) {
            Runnable runnable = null;
            synchronized (this.outstandingMessageBatches) {
                OutstandingMessagesBatch peek = this.outstandingMessageBatches.peek();
                if (peek == null) {
                    return;
                }
                outstandingMessage = (OutstandingMessagesBatch.OutstandingMessage) peek.messages.peek();
                if (outstandingMessage == null) {
                    return;
                }
                try {
                    try {
                        this.flowController.reserve(1, outstandingMessage.receivedMessage().getMessage().getSerializedSize());
                        peek.messages.poll();
                        isEmpty = peek.messages.isEmpty();
                        if (isEmpty) {
                            this.outstandingMessageBatches.poll();
                            runnable = peek.doneCallback;
                        }
                    } catch (FlowController.FlowControlException e) {
                        throw new IllegalStateException("Flow control unexpected exception", e);
                    }
                } catch (FlowController.MaxOutstandingElementCountReachedException | FlowController.MaxOutstandingRequestBytesReachedException e2) {
                    return;
                }
            }
            final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
            AckHandler ackHandler = outstandingMessage.ackHandler();
            final SettableFuture create = SettableFuture.create();
            final AckReplyConsumer ackReplyConsumer = new AckReplyConsumer() { // from class: com.google.cloud.pubsub.spi.v1.MessageDispatcher.1
                @Override // com.google.cloud.pubsub.spi.v1.AckReplyConsumer
                public void ack() {
                    create.set(AckReply.ACK);
                }

                @Override // com.google.cloud.pubsub.spi.v1.AckReplyConsumer
                public void nack() {
                    create.set(AckReply.NACK);
                }
            };
            Futures.addCallback(create, ackHandler);
            this.executor.submit(new Runnable() { // from class: com.google.cloud.pubsub.spi.v1.MessageDispatcher.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        MessageDispatcher.this.receiver.receiveMessage(message, ackReplyConsumer);
                    } catch (Exception e3) {
                        create.setException(e3);
                    }
                }
            });
            if (isEmpty) {
                runnable.run();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupPendingAcksAlarm() {
        this.alarmsLock.lock();
        try {
            if (this.pendingAcksAlarm == null) {
                this.pendingAcksAlarm = this.alarmsExecutor.schedule(new Runnable() { // from class: com.google.cloud.pubsub.spi.v1.MessageDispatcher.3
                    @Override // java.lang.Runnable
                    public void run() {
                        MessageDispatcher.this.alarmsLock.lock();
                        try {
                            MessageDispatcher.this.pendingAcksAlarm = null;
                            MessageDispatcher.this.processOutstandingAckOperations();
                        } finally {
                            MessageDispatcher.this.alarmsLock.unlock();
                        }
                    }
                }, PENDING_ACKS_SEND_DELAY.toMillis(), TimeUnit.MILLISECONDS);
            }
        } finally {
            this.alarmsLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupNextAckDeadlineExtensionAlarm(Instant instant) {
        Instant minus = instant.minus(this.ackExpirationPadding);
        this.alarmsLock.lock();
        try {
            if (this.nextAckDeadlineExtensionAlarmTime.isAfter(minus)) {
                logger.log(Level.FINER, "Scheduling next alarm time: {0}, previous alarm time: {1}", new Object[]{minus, this.nextAckDeadlineExtensionAlarmTime});
                if (this.ackDeadlineExtensionAlarm != null) {
                    logger.log(Level.FINER, "Canceling previous alarm");
                    this.ackDeadlineExtensionAlarm.cancel(false);
                }
                this.nextAckDeadlineExtensionAlarmTime = minus;
                this.ackDeadlineExtensionAlarm = this.alarmsExecutor.schedule(new AckDeadlineAlarm(), this.nextAckDeadlineExtensionAlarmTime.toEpochMilli() - this.clock.millisTime(), TimeUnit.MILLISECONDS);
            }
        } finally {
            this.alarmsLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processOutstandingAckOperations() {
        processOutstandingAckOperations(Collections.emptyList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processOutstandingAckOperations(List<PendingModifyAckDeadline> list) {
        ArrayList newArrayList = Lists.newArrayList(list);
        ArrayList arrayList = new ArrayList(this.pendingAcks.size());
        synchronized (this.pendingAcks) {
            if (!this.pendingAcks.isEmpty()) {
                try {
                    arrayList = new ArrayList(this.pendingAcks);
                    logger.log(Level.FINER, "Sending {0} acks", Integer.valueOf(arrayList.size()));
                    this.pendingAcks.clear();
                } catch (Throwable th) {
                    this.pendingAcks.clear();
                    throw th;
                }
            }
        }
        PendingModifyAckDeadline pendingModifyAckDeadline = new PendingModifyAckDeadline(0, new String[0]);
        synchronized (this.pendingNacks) {
            if (!this.pendingNacks.isEmpty()) {
                try {
                    Iterator<String> it = this.pendingNacks.iterator();
                    while (it.hasNext()) {
                        pendingModifyAckDeadline.addAckId(it.next());
                    }
                    logger.log(Level.FINER, "Sending {0} nacks", Integer.valueOf(this.pendingNacks.size()));
                    this.pendingNacks.clear();
                    newArrayList.add(pendingModifyAckDeadline);
                } catch (Throwable th2) {
                    this.pendingNacks.clear();
                    throw th2;
                }
            }
        }
        this.ackProcessor.sendAckOperations(arrayList, newArrayList);
    }
}
