package org.apache.rocketmq.broker.processor;

import com.alibaba.fastjson.JSON;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/PopBufferMergeService.class */
public class PopBufferMergeService extends ServiceThread {
    private static final Logger POP_LOGGER = LoggerFactory.getLogger("RocketmqPop");
    private final BrokerController brokerController;
    private final PopMessageProcessor popMessageProcessor;
    private final PopMessageProcessor.QueueLockManager queueLockManager;
    ConcurrentHashMap<String, PopCheckPointWrapper> buffer = new ConcurrentHashMap<>(16384);
    ConcurrentHashMap<String, QueueWithTime<PopCheckPointWrapper>> commitOffsets = new ConcurrentHashMap<>();
    private volatile boolean serving = true;
    private AtomicInteger counter = new AtomicInteger(0);
    private int scanTimes = 0;
    private final long interval = 5;
    private final long minute5 = 300000;
    private final int countOfMinute1 = 12000;
    private final int countOfSecond1 = 200;
    private final int countOfSecond30 = 6000;
    private final List<Byte> batchAckIndexList = new ArrayList(32);
    private volatile boolean master = false;

    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PopBufferMergeService$PopCheckPointWrapper.class */
    public class PopCheckPointWrapper {
        private final int reviveQueueId;
        private volatile long reviveQueueOffset;
        private final PopCheckPoint ck;
        private final AtomicInteger bits;
        private final AtomicInteger toStoreBits;
        private final long nextBeginOffset;
        private final String lockKey;
        private final String mergeKey;
        private final boolean justOffset;
        private volatile boolean ckStored;

        public PopCheckPointWrapper(int i, long j, PopCheckPoint popCheckPoint, long j2) {
            this.ckStored = false;
            this.reviveQueueId = i;
            this.reviveQueueOffset = j;
            this.ck = popCheckPoint;
            this.bits = new AtomicInteger(0);
            this.toStoreBits = new AtomicInteger(0);
            this.nextBeginOffset = j2;
            this.lockKey = this.ck.getTopic() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + this.ck.getCId() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + this.ck.getQueueId();
            this.mergeKey = popCheckPoint.getTopic() + popCheckPoint.getCId() + popCheckPoint.getQueueId() + popCheckPoint.getStartOffset() + popCheckPoint.getPopTime() + popCheckPoint.getBrokerName();
            this.justOffset = false;
        }

        public PopCheckPointWrapper(int i, long j, PopCheckPoint popCheckPoint, long j2, boolean z) {
            this.ckStored = false;
            this.reviveQueueId = i;
            this.reviveQueueOffset = j;
            this.ck = popCheckPoint;
            this.bits = new AtomicInteger(0);
            this.toStoreBits = new AtomicInteger(0);
            this.nextBeginOffset = j2;
            this.lockKey = this.ck.getTopic() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + this.ck.getCId() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + this.ck.getQueueId();
            this.mergeKey = popCheckPoint.getTopic() + popCheckPoint.getCId() + popCheckPoint.getQueueId() + popCheckPoint.getStartOffset() + popCheckPoint.getPopTime() + popCheckPoint.getBrokerName();
            this.justOffset = z;
        }

        public int getReviveQueueId() {
            return this.reviveQueueId;
        }

        public long getReviveQueueOffset() {
            return this.reviveQueueOffset;
        }

        public boolean isCkStored() {
            return this.ckStored;
        }

        public void setReviveQueueOffset(long j) {
            this.reviveQueueOffset = j;
        }

        public PopCheckPoint getCk() {
            return this.ck;
        }

        public AtomicInteger getBits() {
            return this.bits;
        }

        public AtomicInteger getToStoreBits() {
            return this.toStoreBits;
        }

        public long getNextBeginOffset() {
            return this.nextBeginOffset;
        }

        public String getLockKey() {
            return this.lockKey;
        }

        public String getMergeKey() {
            return this.mergeKey;
        }

        public boolean isJustOffset() {
            return this.justOffset;
        }

        public void setCkStored(boolean z) {
            this.ckStored = z;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("CkWrap{");
            sb.append("rq=").append(this.reviveQueueId);
            sb.append(", rqo=").append(this.reviveQueueOffset);
            sb.append(", ck=").append(this.ck);
            sb.append(", bits=").append(this.bits);
            sb.append(", sBits=").append(this.toStoreBits);
            sb.append(", nbo=").append(this.nextBeginOffset);
            sb.append(", cks=").append(this.ckStored);
            sb.append(", jo=").append(this.justOffset);
            sb.append('}');
            return sb.toString();
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PopBufferMergeService$QueueWithTime.class */
    public class QueueWithTime<T> {
        private final LinkedBlockingDeque<T> queue = new LinkedBlockingDeque<>();
        private long time = System.currentTimeMillis();

        public QueueWithTime() {
        }

        public void setTime(long j) {
            this.time = j;
        }

        public long getTime() {
            return this.time;
        }

        public LinkedBlockingDeque<T> get() {
            return this.queue;
        }
    }

    public PopBufferMergeService(BrokerController brokerController, PopMessageProcessor popMessageProcessor) {
        this.brokerController = brokerController;
        this.popMessageProcessor = popMessageProcessor;
        this.queueLockManager = popMessageProcessor.getQueueLockManager();
    }

    private boolean isShouldRunning() {
        if (this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()) {
            return true;
        }
        this.master = this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
        return this.master;
    }

    public String getServiceName() {
        return (this.brokerController == null || !this.brokerController.getBrokerConfig().isInBrokerContainer()) ? PopBufferMergeService.class.getSimpleName() : this.brokerController.getBrokerIdentity().getIdentifier() + PopBufferMergeService.class.getSimpleName();
    }

    public void run() {
        while (!isStopped()) {
            try {
                if (isShouldRunning()) {
                    scan();
                    if (this.scanTimes % 6000 == 0) {
                        scanGarbage();
                    }
                    waitForRunning(5L);
                    if (!this.serving && this.buffer.size() == 0 && getOffsetTotalSize() == 0) {
                        this.serving = true;
                    }
                } else {
                    waitForRunning(5000L);
                    POP_LOGGER.info("Broker is {}, {}, clear all data", this.brokerController.getMessageStoreConfig().getBrokerRole(), Boolean.valueOf(this.master));
                    this.buffer.clear();
                    this.commitOffsets.clear();
                }
            } catch (Throwable th) {
                POP_LOGGER.error("PopBufferMergeService error", th);
                waitForRunning(3000L);
            }
        }
        this.serving = false;
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
        }
        if (!isShouldRunning()) {
            return;
        }
        while (true) {
            if (this.buffer.size() <= 0 && getOffsetTotalSize() <= 0) {
                return;
            } else {
                scan();
            }
        }
    }

    private int scanCommitOffset() {
        PopCheckPointWrapper peek;
        int i = 0;
        for (Map.Entry<String, QueueWithTime<PopCheckPointWrapper>> entry : this.commitOffsets.entrySet()) {
            LinkedBlockingDeque<PopCheckPointWrapper> linkedBlockingDeque = entry.getValue().get();
            while (true) {
                peek = linkedBlockingDeque.peek();
                if (peek == null) {
                    break;
                }
                if ((!peek.isJustOffset() || !peek.isCkStored()) && !isCkDone(peek) && (!isCkDoneForFinish(peek) || !peek.isCkStored())) {
                    break;
                }
                if (!commitOffset(peek)) {
                    break;
                }
                linkedBlockingDeque.poll();
            }
            if (System.currentTimeMillis() - peek.getCk().getPopTime() > this.brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2) {
                POP_LOGGER.warn("[PopBuffer] ck offset long time not commit, {}", peek);
            }
            int size = linkedBlockingDeque.size();
            i += size;
            if (size > 5000 && this.scanTimes % 200 == 0) {
                POP_LOGGER.info("[PopBuffer] offset queue size too long, {}, {}", entry.getKey(), Integer.valueOf(size));
            }
        }
        return i;
    }

    public long getLatestOffset(String str) {
        PopCheckPointWrapper peekLast;
        QueueWithTime<PopCheckPointWrapper> queueWithTime = this.commitOffsets.get(str);
        if (queueWithTime == null || (peekLast = queueWithTime.get().peekLast()) == null) {
            return -1L;
        }
        return peekLast.getNextBeginOffset();
    }

    public long getLatestOffset(String str, String str2, int i) {
        return getLatestOffset(KeyBuilder.buildPollingKey(str, str2, i));
    }

    private void scanGarbage() {
        String[] split;
        Iterator<Map.Entry<String, QueueWithTime<PopCheckPointWrapper>>> it = this.commitOffsets.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, QueueWithTime<PopCheckPointWrapper>> next = it.next();
            if (next.getKey() != null && (split = next.getKey().split(ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR)) != null && split.length == 3) {
                String str = split[0];
                String str2 = split[1];
                if (this.brokerController.getTopicConfigManager().selectTopicConfig(str) == null) {
                    POP_LOGGER.info("[PopBuffer]remove not exit topic {} in buffer!", str);
                    it.remove();
                } else if (!this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(str2)) {
                    POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {} in buffer!", str2, str);
                    it.remove();
                } else if (System.currentTimeMillis() - next.getValue().getTime() > 300000) {
                    POP_LOGGER.info("[PopBuffer]remove long time not used sub {} of topic {} in buffer!", str2, str);
                    it.remove();
                }
            }
        }
    }

    private void scan() {
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        int i2 = 0;
        Iterator<Map.Entry<String, PopCheckPointWrapper>> it = this.buffer.entrySet().iterator();
        while (it.hasNext()) {
            PopCheckPointWrapper value = it.next().getValue();
            if ((value.isJustOffset() && value.isCkStored()) || isCkDone(value) || (isCkDoneForFinish(value) && value.isCkStored())) {
                if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
                    POP_LOGGER.info("[PopBuffer]ck done, {}", value);
                }
                it.remove();
                this.counter.decrementAndGet();
            } else {
                PopCheckPoint ck = value.getCk();
                long currentTimeMillis2 = System.currentTimeMillis();
                boolean z = !this.serving;
                if (ck.getReviveTime() - currentTimeMillis2 < this.brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
                    z = true;
                }
                if (currentTimeMillis2 - ck.getPopTime() > this.brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
                    z = true;
                }
                if (currentTimeMillis2 - ck.getPopTime() > this.brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2) {
                    POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", value);
                }
                if (isCkDone(value)) {
                    continue;
                } else if (value.isJustOffset()) {
                    if (value.getReviveQueueOffset() < 0) {
                        putCkToStore(value, false);
                        i2++;
                    }
                } else if (z) {
                    if (value.getReviveQueueOffset() < 0) {
                        putCkToStore(value, false);
                        i2++;
                    }
                    if (value.isCkStored()) {
                        if (!this.brokerController.getBrokerConfig().isEnablePopBatchAck()) {
                            byte b = 0;
                            while (true) {
                                byte b2 = b;
                                if (b2 >= ck.getNum()) {
                                    break;
                                }
                                if (DataConverter.getBit(value.getBits().get(), b2) && !DataConverter.getBit(value.getToStoreBits().get(), b2) && putAckToStore(value, b2)) {
                                    i++;
                                    markBitCAS(value.getToStoreBits(), b2);
                                }
                                b = (byte) (b2 + 1);
                            }
                        } else {
                            List<Byte> list = this.batchAckIndexList;
                            for (byte b3 = 0; b3 < ck.getNum(); b3 = (byte) (b3 + 1)) {
                                try {
                                    if (DataConverter.getBit(value.getBits().get(), b3) && !DataConverter.getBit(value.getToStoreBits().get(), b3)) {
                                        list.add(Byte.valueOf(b3));
                                    }
                                } finally {
                                    list.clear();
                                }
                            }
                            if (list.size() > 0 && putBatchAckToStore(value, list)) {
                                i += list.size();
                                Iterator<Byte> it2 = list.iterator();
                                while (it2.hasNext()) {
                                    markBitCAS(value.getToStoreBits(), it2.next().byteValue());
                                }
                            }
                        }
                        if (isCkDoneForFinish(value) && value.isCkStored()) {
                            if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
                                POP_LOGGER.info("[PopBuffer]ck finish, {}", value);
                            }
                            it.remove();
                            this.counter.decrementAndGet();
                        }
                    } else {
                        continue;
                    }
                } else {
                    continue;
                }
            }
        }
        int scanCommitOffset = scanCommitOffset();
        long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
        if (currentTimeMillis3 > this.brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {
            POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, PopBufferEclipse={}, PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}", new Object[]{Long.valueOf(currentTimeMillis3), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(this.counter.get()), Integer.valueOf(scanCommitOffset)});
            this.serving = false;
        } else if (this.scanTimes % 200 == 0) {
            POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}", new Object[]{Long.valueOf(currentTimeMillis3), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(this.counter.get()), Integer.valueOf(scanCommitOffset)});
        }
        PopMetricsManager.recordPopBufferScanTimeConsume(currentTimeMillis3);
        this.scanTimes++;
        if (this.scanTimes >= 12000) {
            this.counter.set(this.buffer.size());
            this.scanTimes = 0;
        }
    }

    public int getOffsetTotalSize() {
        int i = 0;
        Iterator<Map.Entry<String, QueueWithTime<PopCheckPointWrapper>>> it = this.commitOffsets.entrySet().iterator();
        while (it.hasNext()) {
            i += it.next().getValue().get().size();
        }
        return i;
    }

    public int getBufferedCKSize() {
        return this.counter.get();
    }

    private void markBitCAS(AtomicInteger atomicInteger, int i) {
        int i2;
        do {
            i2 = atomicInteger.get();
            if (DataConverter.getBit(i2, i)) {
                return;
            }
        } while (!atomicInteger.compareAndSet(i2, DataConverter.setBit(i2, i, true)));
    }

    private boolean commitOffset(PopCheckPointWrapper popCheckPointWrapper) {
        if (popCheckPointWrapper.getNextBeginOffset() < 0) {
            return true;
        }
        PopCheckPoint ck = popCheckPointWrapper.getCk();
        String lockKey = popCheckPointWrapper.getLockKey();
        if (!this.queueLockManager.tryLock(lockKey)) {
            return false;
        }
        try {
            long queryOffset = this.brokerController.getConsumerOffsetManager().queryOffset(ck.getCId(), ck.getTopic(), ck.getQueueId());
            if (popCheckPointWrapper.getNextBeginOffset() <= queryOffset) {
                POP_LOGGER.warn("Commit offset, consumer offset less than store, {}, {}", popCheckPointWrapper, Long.valueOf(queryOffset));
            } else if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
                POP_LOGGER.info("Commit offset, {}, {}", popCheckPointWrapper, Long.valueOf(queryOffset));
            }
            this.brokerController.getConsumerOffsetManager().commitOffset(getServiceName(), ck.getCId(), ck.getTopic(), ck.getQueueId(), popCheckPointWrapper.getNextBeginOffset());
            this.queueLockManager.unLock(lockKey);
            return true;
        } catch (Throwable th) {
            this.queueLockManager.unLock(lockKey);
            throw th;
        }
    }

    private boolean putOffsetQueue(PopCheckPointWrapper popCheckPointWrapper) {
        QueueWithTime<PopCheckPointWrapper> queueWithTime = this.commitOffsets.get(popCheckPointWrapper.getLockKey());
        if (queueWithTime == null) {
            queueWithTime = new QueueWithTime<>();
            QueueWithTime<PopCheckPointWrapper> putIfAbsent = this.commitOffsets.putIfAbsent(popCheckPointWrapper.getLockKey(), queueWithTime);
            if (putIfAbsent != null) {
                queueWithTime = putIfAbsent;
            }
        }
        queueWithTime.setTime(popCheckPointWrapper.getCk().getPopTime());
        return queueWithTime.get().offer(popCheckPointWrapper);
    }

    private boolean checkQueueOk(PopCheckPointWrapper popCheckPointWrapper) {
        QueueWithTime<PopCheckPointWrapper> queueWithTime = this.commitOffsets.get(popCheckPointWrapper.getLockKey());
        return queueWithTime == null || queueWithTime.get().size() < this.brokerController.getBrokerConfig().getPopCkOffsetMaxQueueSize();
    }

    public boolean addCkJustOffset(PopCheckPoint popCheckPoint, int i, long j, long j2) {
        PopCheckPointWrapper popCheckPointWrapper = new PopCheckPointWrapper(i, j, popCheckPoint, j2, true);
        if (this.buffer.containsKey(popCheckPointWrapper.getMergeKey())) {
            POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ckJustOffset. ck:{}, mergeKey:{}", popCheckPointWrapper, popCheckPointWrapper.getMergeKey());
            return false;
        }
        putCkToStore(popCheckPointWrapper, !checkQueueOk(popCheckPointWrapper));
        putOffsetQueue(popCheckPointWrapper);
        this.buffer.put(popCheckPointWrapper.getMergeKey(), popCheckPointWrapper);
        this.counter.incrementAndGet();
        if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
            return true;
        }
        POP_LOGGER.info("[PopBuffer]add ck just offset, {}", popCheckPointWrapper);
        return true;
    }

    public void addCkMock(String str, String str2, int i, long j, long j2, long j3, int i2, long j4, String str3) {
        PopCheckPoint popCheckPoint = new PopCheckPoint();
        popCheckPoint.setBitMap(0);
        popCheckPoint.setNum((byte) 0);
        popCheckPoint.setPopTime(j3);
        popCheckPoint.setInvisibleTime(j2);
        popCheckPoint.setStartOffset(j);
        popCheckPoint.setCId(str);
        popCheckPoint.setTopic(str2);
        popCheckPoint.setQueueId(i);
        popCheckPoint.setBrokerName(str3);
        PopCheckPointWrapper popCheckPointWrapper = new PopCheckPointWrapper(i2, Long.MAX_VALUE, popCheckPoint, j4, true);
        popCheckPointWrapper.setCkStored(true);
        putOffsetQueue(popCheckPointWrapper);
        if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("[PopBuffer]add ck just offset, mocked, {}", popCheckPointWrapper);
        }
    }

    public boolean addCk(PopCheckPoint popCheckPoint, int i, long j, long j2) {
        if (!this.brokerController.getBrokerConfig().isEnablePopBufferMerge() || !this.serving) {
            return false;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (popCheckPoint.getReviveTime() - currentTimeMillis < this.brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {
            if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
                return false;
            }
            POP_LOGGER.warn("[PopBuffer]add ck, timeout, {}, {}", popCheckPoint, Long.valueOf(currentTimeMillis));
            return false;
        }
        if (this.counter.get() > this.brokerController.getBrokerConfig().getPopCkMaxBufferSize()) {
            POP_LOGGER.warn("[PopBuffer]add ck, max size, {}, {}", popCheckPoint, Integer.valueOf(this.counter.get()));
            return false;
        }
        PopCheckPointWrapper popCheckPointWrapper = new PopCheckPointWrapper(i, j, popCheckPoint, j2);
        if (!checkQueueOk(popCheckPointWrapper)) {
            return false;
        }
        if (this.buffer.containsKey(popCheckPointWrapper.getMergeKey())) {
            POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ck. ck:{}, mergeKey:{}", popCheckPointWrapper, popCheckPointWrapper.getMergeKey());
            return false;
        }
        putOffsetQueue(popCheckPointWrapper);
        this.buffer.put(popCheckPointWrapper.getMergeKey(), popCheckPointWrapper);
        this.counter.incrementAndGet();
        if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
            return true;
        }
        POP_LOGGER.info("[PopBuffer]add ck, {}", popCheckPointWrapper);
        return true;
    }

    public boolean addAk(int i, AckMsg ackMsg) {
        if (!this.brokerController.getBrokerConfig().isEnablePopBufferMerge() || !this.serving) {
            return false;
        }
        try {
            PopCheckPointWrapper popCheckPointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());
            if (popCheckPointWrapper == null) {
                if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
                    return false;
                }
                POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, no ck, {}", Integer.valueOf(i), ackMsg);
                return false;
            }
            if (popCheckPointWrapper.isJustOffset()) {
                return false;
            }
            PopCheckPoint ck = popCheckPointWrapper.getCk();
            long currentTimeMillis = System.currentTimeMillis();
            if (ck.getReviveTime() - currentTimeMillis < this.brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {
                if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
                    return false;
                }
                POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}", new Object[]{Integer.valueOf(i), popCheckPointWrapper, ackMsg, Long.valueOf(currentTimeMillis)});
                return false;
            }
            if (currentTimeMillis - ck.getPopTime() > this.brokerController.getBrokerConfig().getPopCkStayBufferTime() - 1500) {
                if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
                    return false;
                }
                POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, stay too long, {}, {}, {}", new Object[]{Integer.valueOf(i), popCheckPointWrapper, ackMsg, Long.valueOf(currentTimeMillis)});
                return false;
            }
            if (ackMsg instanceof BatchAckMsg) {
                Iterator it = ((BatchAckMsg) ackMsg).getAckOffsetList().iterator();
                while (it.hasNext()) {
                    int indexOfAck = ck.indexOfAck(((Long) it.next()).longValue());
                    if (indexOfAck > -1) {
                        markBitCAS(popCheckPointWrapper.getBits(), indexOfAck);
                    } else {
                        POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", new Object[]{Integer.valueOf(i), ackMsg, ck});
                    }
                }
            } else {
                int indexOfAck2 = ck.indexOfAck(ackMsg.getAckOffset());
                if (indexOfAck2 <= -1) {
                    POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", new Object[]{Integer.valueOf(i), ackMsg, ck});
                    return true;
                }
                markBitCAS(popCheckPointWrapper.getBits(), indexOfAck2);
            }
            if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
                return true;
            }
            POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", new Object[]{Integer.valueOf(i), popCheckPointWrapper, ackMsg});
            return true;
        } catch (Throwable th) {
            POP_LOGGER.error("[PopBuffer]add ack error, rqId=" + i + ", " + ackMsg, th);
            return false;
        }
    }

    public void clearOffsetQueue(String str) {
        this.commitOffsets.remove(str);
    }

    private void putCkToStore(PopCheckPointWrapper popCheckPointWrapper, boolean z) {
        if (popCheckPointWrapper.getReviveQueueOffset() >= 0) {
            return;
        }
        PutMessageResult putMessageToSpecificQueue = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(this.popMessageProcessor.buildCkMsg(popCheckPointWrapper.getCk(), popCheckPointWrapper.getReviveQueueId()));
        PopMetricsManager.incPopReviveCkPutCount(popCheckPointWrapper.getCk(), putMessageToSpecificQueue.getPutMessageStatus());
        if (putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
            POP_LOGGER.error("[PopBuffer]put ck to store fail: {}, {}", popCheckPointWrapper, putMessageToSpecificQueue);
            return;
        }
        popCheckPointWrapper.setCkStored(true);
        if (putMessageToSpecificQueue.isRemotePut()) {
            popCheckPointWrapper.setReviveQueueOffset(0L);
        } else {
            popCheckPointWrapper.setReviveQueueOffset(putMessageToSpecificQueue.getAppendMessageResult().getLogicsOffset());
        }
        if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("[PopBuffer]put ck to store ok: {}, {}", popCheckPointWrapper, putMessageToSpecificQueue);
        }
    }

    private boolean putAckToStore(PopCheckPointWrapper popCheckPointWrapper, byte b) {
        PopCheckPoint ck = popCheckPointWrapper.getCk();
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        AckMsg ackMsg = new AckMsg();
        ackMsg.setAckOffset(ck.ackOffsetByIndex(b));
        ackMsg.setStartOffset(ck.getStartOffset());
        ackMsg.setConsumerGroup(ck.getCId());
        ackMsg.setTopic(ck.getTopic());
        ackMsg.setQueueId(ck.getQueueId());
        ackMsg.setPopTime(ck.getPopTime());
        messageExtBrokerInner.setTopic(this.popMessageProcessor.reviveTopic);
        messageExtBrokerInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
        messageExtBrokerInner.setQueueId(popCheckPointWrapper.getReviveQueueId());
        messageExtBrokerInner.setTags("ack");
        messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
        messageExtBrokerInner.setBornHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setStoreHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setDeliverTimeMs(ck.getReviveTime());
        messageExtBrokerInner.getProperties().put("UNIQ_KEY", PopMessageProcessor.genAckUniqueId(ackMsg));
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
        PutMessageResult putMessageToSpecificQueue = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(messageExtBrokerInner);
        PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageToSpecificQueue.getPutMessageStatus());
        if (putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
            POP_LOGGER.error("[PopBuffer]put ack to store fail: {}, {}, {}", new Object[]{popCheckPointWrapper, ackMsg, putMessageToSpecificQueue});
            return false;
        }
        if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
            return true;
        }
        POP_LOGGER.info("[PopBuffer]put ack to store ok: {}, {}, {}", new Object[]{popCheckPointWrapper, ackMsg, putMessageToSpecificQueue});
        return true;
    }

    private boolean putBatchAckToStore(PopCheckPointWrapper popCheckPointWrapper, List<Byte> list) {
        PopCheckPoint ck = popCheckPointWrapper.getCk();
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        BatchAckMsg batchAckMsg = new BatchAckMsg();
        Iterator<Byte> it = list.iterator();
        while (it.hasNext()) {
            batchAckMsg.getAckOffsetList().add(Long.valueOf(ck.ackOffsetByIndex(it.next().byteValue())));
        }
        batchAckMsg.setStartOffset(ck.getStartOffset());
        batchAckMsg.setConsumerGroup(ck.getCId());
        batchAckMsg.setTopic(ck.getTopic());
        batchAckMsg.setQueueId(ck.getQueueId());
        batchAckMsg.setPopTime(ck.getPopTime());
        messageExtBrokerInner.setTopic(this.popMessageProcessor.reviveTopic);
        messageExtBrokerInner.setBody(JSON.toJSONString(batchAckMsg).getBytes(DataConverter.charset));
        messageExtBrokerInner.setQueueId(popCheckPointWrapper.getReviveQueueId());
        messageExtBrokerInner.setTags("bAck");
        messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
        messageExtBrokerInner.setBornHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setStoreHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setDeliverTimeMs(ck.getReviveTime());
        messageExtBrokerInner.getProperties().put("UNIQ_KEY", PopMessageProcessor.genBatchAckUniqueId(batchAckMsg));
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
        PutMessageResult putMessageToSpecificQueue = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(messageExtBrokerInner);
        if (putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
            POP_LOGGER.error("[PopBuffer]put batch ack to store fail: {}, {}, {}", new Object[]{popCheckPointWrapper, batchAckMsg, putMessageToSpecificQueue});
            return false;
        }
        if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
            return true;
        }
        POP_LOGGER.info("[PopBuffer]put batch ack to store ok: {}, {}, {}", new Object[]{popCheckPointWrapper, batchAckMsg, putMessageToSpecificQueue});
        return true;
    }

    private boolean cancelCkTimer(PopCheckPointWrapper popCheckPointWrapper) {
        if (popCheckPointWrapper.getReviveQueueOffset() < 0) {
            return true;
        }
        PopCheckPoint ck = popCheckPointWrapper.getCk();
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(this.popMessageProcessor.reviveTopic);
        messageExtBrokerInner.setBody((popCheckPointWrapper.getReviveQueueId() + "-" + popCheckPointWrapper.getReviveQueueOffset()).getBytes(StandardCharsets.UTF_8));
        messageExtBrokerInner.setQueueId(popCheckPointWrapper.getReviveQueueId());
        messageExtBrokerInner.setTags("ck");
        messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
        messageExtBrokerInner.setBornHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setStoreHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setDeliverTimeMs(ck.getReviveTime() - PopAckConstants.ackTimeInterval);
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
        PutMessageResult putMessageToSpecificQueue = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(messageExtBrokerInner);
        if (putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
            POP_LOGGER.error("[PopBuffer]PutMessageCallback cancelCheckPoint fail, {}, {}", popCheckPointWrapper, putMessageToSpecificQueue);
            return false;
        }
        if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
            return true;
        }
        POP_LOGGER.info("[PopBuffer]cancelCheckPoint, {}", popCheckPointWrapper);
        return true;
    }

    private boolean isCkDone(PopCheckPointWrapper popCheckPointWrapper) {
        byte num = popCheckPointWrapper.getCk().getNum();
        byte b = 0;
        while (true) {
            byte b2 = b;
            if (b2 >= num) {
                return true;
            }
            if (!DataConverter.getBit(popCheckPointWrapper.getBits().get(), b2)) {
                return false;
            }
            b = (byte) (b2 + 1);
        }
    }

    private boolean isCkDoneForFinish(PopCheckPointWrapper popCheckPointWrapper) {
        byte num = popCheckPointWrapper.getCk().getNum();
        int i = popCheckPointWrapper.getBits().get() ^ popCheckPointWrapper.getToStoreBits().get();
        byte b = 0;
        while (true) {
            byte b2 = b;
            if (b2 >= num) {
                return true;
            }
            if (DataConverter.getBit(i, b2)) {
                return false;
            }
            b = (byte) (b2 + 1);
        }
    }
}
