package org.apache.samza.util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;

/* loaded from: input_file:org/apache/samza/util/BlockingEnvelopeMap.class */
public abstract class BlockingEnvelopeMap implements SystemConsumer {
    private final BlockingEnvelopeMapMetrics metrics;
    private final ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>> bufferedMessages;
    private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize;
    private final Map<SystemStreamPartition, Boolean> noMoreMessage;
    private final Clock clock;

    /* loaded from: input_file:org/apache/samza/util/BlockingEnvelopeMap$BlockingEnvelopeMapMetrics.class */
    public class BlockingEnvelopeMapMetrics {
        private final String group;
        private final MetricsRegistry metricsRegistry;
        private final ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>> noMoreMessageGaugeMap = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollCountMap = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollTimeoutCountMap = new ConcurrentHashMap<>();
        private final Counter pollCount;

        public BlockingEnvelopeMapMetrics(String str, MetricsRegistry metricsRegistry) {
            this.group = str;
            this.metricsRegistry = metricsRegistry;
            this.pollCount = metricsRegistry.newCounter(str, "poll-count");
        }

        public void initMetrics(SystemStreamPartition systemStreamPartition) {
            this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, this.metricsRegistry.newGauge(this.group, "no-more-messages-" + systemStreamPartition, false));
            this.blockingPollCountMap.putIfAbsent(systemStreamPartition, this.metricsRegistry.newCounter(this.group, "blocking-poll-count-" + systemStreamPartition));
            this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, this.metricsRegistry.newCounter(this.group, "blocking-poll-timeout-count-" + systemStreamPartition));
            this.metricsRegistry.newGauge(this.group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
            this.metricsRegistry.newGauge(this.group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
        }

        public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean z) {
            this.noMoreMessageGaugeMap.get(systemStreamPartition).set(Boolean.valueOf(z));
        }

        public void incBlockingPoll(SystemStreamPartition systemStreamPartition) {
            this.blockingPollCountMap.get(systemStreamPartition).inc();
        }

        public void incBlockingTimeoutPoll(SystemStreamPartition systemStreamPartition) {
            this.blockingPollTimeoutCountMap.get(systemStreamPartition).inc();
        }

        public void incPoll() {
            this.pollCount.inc();
        }
    }

    /* loaded from: input_file:org/apache/samza/util/BlockingEnvelopeMap$BufferGauge.class */
    public class BufferGauge extends Gauge<Integer> {
        private final SystemStreamPartition systemStreamPartition;

        public BufferGauge(SystemStreamPartition systemStreamPartition, String str) {
            super(str, 0);
            this.systemStreamPartition = systemStreamPartition;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.samza.metrics.Gauge
        public Integer getValue() {
            Queue queue = (Queue) BlockingEnvelopeMap.this.bufferedMessages.get(this.systemStreamPartition);
            if (queue == null) {
                return 0;
            }
            return Integer.valueOf(queue.size());
        }
    }

    /* loaded from: input_file:org/apache/samza/util/BlockingEnvelopeMap$BufferSizeGauge.class */
    public class BufferSizeGauge extends Gauge<Long> {
        private final SystemStreamPartition systemStreamPartition;

        public BufferSizeGauge(SystemStreamPartition systemStreamPartition, String str) {
            super(str, 0L);
            this.systemStreamPartition = systemStreamPartition;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.samza.metrics.Gauge
        public Long getValue() {
            AtomicLong atomicLong = (AtomicLong) BlockingEnvelopeMap.this.bufferedMessagesSize.get(this.systemStreamPartition);
            if (atomicLong == null) {
                return 0L;
            }
            return Long.valueOf(atomicLong.get());
        }
    }

    public BlockingEnvelopeMap() {
        this(new NoOpMetricsRegistry());
    }

    public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) {
        this(metricsRegistry, new Clock() { // from class: org.apache.samza.util.BlockingEnvelopeMap.1
            @Override // org.apache.samza.util.Clock
            public long currentTimeMillis() {
                return System.currentTimeMillis();
            }
        });
    }

    public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
        this(metricsRegistry, clock, null);
    }

    public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String str) {
        this.metrics = new BlockingEnvelopeMapMetrics(str == null ? getClass().getName() : str, metricsRegistry);
        this.bufferedMessages = new ConcurrentHashMap<>();
        this.noMoreMessage = new ConcurrentHashMap();
        this.clock = clock;
        this.bufferedMessagesSize = new ConcurrentHashMap<>();
    }

    @Override // org.apache.samza.system.SystemConsumer
    public void register(SystemStreamPartition systemStreamPartition, String str) {
        this.metrics.initMetrics(systemStreamPartition);
        this.bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
        this.bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0L));
    }

    protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
        return new LinkedBlockingQueue();
    }

    @Override // org.apache.samza.system.SystemConsumer
    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long j) throws InterruptedException {
        long currentTimeMillis = this.clock.currentTimeMillis() + j;
        HashMap hashMap = new HashMap();
        this.metrics.incPoll();
        for (SystemStreamPartition systemStreamPartition : set) {
            BlockingQueue<IncomingMessageEnvelope> blockingQueue = this.bufferedMessages.get(systemStreamPartition);
            ArrayList arrayList = new ArrayList(blockingQueue.size());
            if (blockingQueue.size() > 0) {
                blockingQueue.drainTo(arrayList);
            } else if (j != 0) {
                IncomingMessageEnvelope incomingMessageEnvelope = null;
                long currentTimeMillis2 = currentTimeMillis - this.clock.currentTimeMillis();
                if (j == -1) {
                    while (incomingMessageEnvelope == null && !isAtHead(systemStreamPartition)) {
                        this.metrics.incBlockingPoll(systemStreamPartition);
                        incomingMessageEnvelope = blockingQueue.poll(1000L, TimeUnit.MILLISECONDS);
                    }
                } else if (j > 0 && currentTimeMillis2 > 0) {
                    this.metrics.incBlockingTimeoutPoll(systemStreamPartition);
                    incomingMessageEnvelope = blockingQueue.poll(currentTimeMillis2, TimeUnit.MILLISECONDS);
                }
                if (incomingMessageEnvelope != null) {
                    arrayList.add(incomingMessageEnvelope);
                    blockingQueue.drainTo(arrayList);
                }
            }
            if (arrayList.size() > 0) {
                hashMap.put(systemStreamPartition, arrayList);
                subtractSizeOnQDrain(systemStreamPartition, arrayList);
            }
        }
        return hashMap;
    }

    private void subtractSizeOnQDrain(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> list) {
        long j = 0;
        while (list.iterator().hasNext()) {
            j += r0.next().getSize();
        }
        this.bufferedMessagesSize.get(systemStreamPartition).addAndGet((-1) * j);
    }

    protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope incomingMessageEnvelope) throws InterruptedException {
        this.bufferedMessages.get(systemStreamPartition).put(incomingMessageEnvelope);
        this.bufferedMessagesSize.get(systemStreamPartition).addAndGet(incomingMessageEnvelope.getSize());
    }

    protected void putAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> list) throws InterruptedException {
        BlockingQueue<IncomingMessageEnvelope> blockingQueue = this.bufferedMessages.get(systemStreamPartition);
        Iterator<IncomingMessageEnvelope> it = list.iterator();
        while (it.hasNext()) {
            blockingQueue.put(it.next());
        }
    }

    public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
        BlockingQueue<IncomingMessageEnvelope> blockingQueue = this.bufferedMessages.get(systemStreamPartition);
        if (blockingQueue == null) {
            throw new NullPointerException("Attempting to get queue for " + systemStreamPartition + ", but the system/stream/partition was never registered.");
        }
        return blockingQueue.size();
    }

    public long getMessagesSizeInQueue(SystemStreamPartition systemStreamPartition) {
        AtomicLong atomicLong = this.bufferedMessagesSize.get(systemStreamPartition);
        if (atomicLong == null) {
            throw new NullPointerException("Attempting to get size for " + systemStreamPartition + ", but the system/stream/partition was never registered. or fetch");
        }
        return atomicLong.get();
    }

    protected Boolean setIsAtHead(SystemStreamPartition systemStreamPartition, boolean z) {
        this.metrics.setNoMoreMessages(systemStreamPartition, z);
        return this.noMoreMessage.put(systemStreamPartition, Boolean.valueOf(z));
    }

    protected boolean isAtHead(SystemStreamPartition systemStreamPartition) {
        Boolean bool = this.noMoreMessage.get(systemStreamPartition);
        return getNumMessagesInQueue(systemStreamPartition) == 0 && bool != null && bool.equals(true);
    }
}
