package org.apache.rocketmq.broker.schedule;

import io.opentelemetry.api.common.Attributes;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsConstant;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;

/* loaded from: input_file:org/apache/rocketmq/broker/schedule/ScheduleMessageService.class */
public class ScheduleMessageService extends ConfigManager {
    private static final Logger log = LoggerFactory.getLogger("RocketmqBroker");
    private static final long FIRST_DELAY_TIME = 1000;
    private static final long DELAY_FOR_A_WHILE = 100;
    private static final long DELAY_FOR_A_PERIOD = 10000;
    private static final long WAIT_FOR_SHUTDOWN = 5000;
    private static final long DELAY_FOR_A_SLEEP = 10;
    private ScheduledExecutorService deliverExecutorService;
    private int maxDelayLevel;
    private boolean enableAsyncDeliver;
    private ScheduledExecutorService handleExecutorService;
    private final ScheduledExecutorService scheduledPersistService;
    private final BrokerController brokerController;
    private final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap(32);
    private final ConcurrentMap<Integer, Long> offsetTable = new ConcurrentHashMap(32);
    private final AtomicBoolean started = new AtomicBoolean(false);
    private DataVersion dataVersion = new DataVersion();
    private final Map<Integer, LinkedBlockingQueue<PutResultProcess>> deliverPendingTable = new ConcurrentHashMap(32);
    private final transient AtomicLong versionChangeCounter = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/broker/schedule/ScheduleMessageService$DeliverDelayedMessageTimerTask.class */
    public class DeliverDelayedMessageTimerTask implements Runnable {
        private final int delayLevel;
        private final long offset;
        static final /* synthetic */ boolean $assertionsDisabled;

        public DeliverDelayedMessageTimerTask(int i, long j) {
            this.delayLevel = i;
            this.offset = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (ScheduleMessageService.this.isStarted()) {
                    executeOnTimeUp();
                }
            } catch (Exception e) {
                ScheduleMessageService.log.error("ScheduleMessageService, executeOnTimeUp exception", e);
                scheduleNextTimerTask(this.offset, ScheduleMessageService.DELAY_FOR_A_PERIOD);
            }
        }

        private long correctDeliverTimestamp(long j, long j2) {
            long j3 = j2;
            if (j2 > j + ((Long) ScheduleMessageService.this.delayLevelTable.get(Integer.valueOf(this.delayLevel))).longValue()) {
                j3 = j;
            }
            return j3;
        }

        public void executeOnTimeUp() {
            MessageStore messageStore = ScheduleMessageService.this.brokerController.getMessageStore();
            int delayLevel2QueueId = ScheduleMessageService.delayLevel2QueueId(this.delayLevel);
            ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue("SCHEDULE_TOPIC_XXXX", delayLevel2QueueId);
            if (consumeQueue == null) {
                scheduleNextTimerTask(this.offset, ScheduleMessageService.DELAY_FOR_A_WHILE);
                return;
            }
            ReferredIterator iterateFrom = consumeQueue.iterateFrom(this.offset);
            if (iterateFrom == null) {
                long minOffsetInQueue = consumeQueue.getMinOffsetInQueue();
                if (delayLevel2QueueId > this.offset) {
                    ScheduleMessageService.log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}", new Object[]{Long.valueOf(this.offset), Long.valueOf(minOffsetInQueue), Integer.valueOf(consumeQueue.getQueueId())});
                } else {
                    long maxOffsetInQueue = consumeQueue.getMaxOffsetInQueue();
                    minOffsetInQueue = maxOffsetInQueue;
                    if (maxOffsetInQueue < this.offset) {
                        ScheduleMessageService.log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}", new Object[]{Long.valueOf(this.offset), Long.valueOf(minOffsetInQueue), Integer.valueOf(consumeQueue.getQueueId())});
                    } else {
                        minOffsetInQueue = this.offset;
                    }
                }
                scheduleNextTimerTask(minOffsetInQueue, ScheduleMessageService.DELAY_FOR_A_WHILE);
                return;
            }
            long j = this.offset;
            while (iterateFrom.hasNext() && ScheduleMessageService.this.isStarted()) {
                try {
                    try {
                        CqUnit cqUnit = (CqUnit) iterateFrom.next();
                        long pos = cqUnit.getPos();
                        int size = cqUnit.getSize();
                        long tagsCode = cqUnit.getTagsCode();
                        if (!cqUnit.isTagsCodeValid()) {
                            ScheduleMessageService.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", new Object[]{Long.valueOf(tagsCode), Long.valueOf(pos), Integer.valueOf(size)});
                            tagsCode = ScheduleMessageService.this.computeDeliverTimestamp(this.delayLevel, ScheduleMessageService.this.brokerController.getMessageStore().getCommitLog().pickupStoreTimestamp(pos, size));
                        }
                        long currentTimeMillis = System.currentTimeMillis();
                        long correctDeliverTimestamp = correctDeliverTimestamp(currentTimeMillis, tagsCode);
                        long queueOffset = cqUnit.getQueueOffset();
                        if (!$assertionsDisabled && cqUnit.getBatchNum() != 1) {
                            throw new AssertionError();
                        }
                        j = queueOffset + cqUnit.getBatchNum();
                        if (correctDeliverTimestamp - currentTimeMillis > 0) {
                            scheduleNextTimerTask(queueOffset, ScheduleMessageService.DELAY_FOR_A_WHILE);
                            ScheduleMessageService.this.updateOffset(this.delayLevel, queueOffset);
                            iterateFrom.release();
                            return;
                        }
                        MessageExt lookMessageByOffset = ScheduleMessageService.this.brokerController.getMessageStore().lookMessageByOffset(pos, size);
                        if (lookMessageByOffset != null) {
                            MessageExtBrokerInner messageTimeUp = ScheduleMessageService.this.messageTimeUp(lookMessageByOffset);
                            if ("RMQ_SYS_TRANS_HALF_TOPIC".equals(messageTimeUp.getTopic())) {
                                ScheduleMessageService.log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", messageTimeUp.getTopic(), messageTimeUp);
                            } else {
                                if (!(ScheduleMessageService.this.enableAsyncDeliver ? asyncDeliver(messageTimeUp, lookMessageByOffset.getMsgId(), queueOffset, pos, size) : syncDeliver(messageTimeUp, lookMessageByOffset.getMsgId(), queueOffset, pos, size))) {
                                    scheduleNextTimerTask(j, ScheduleMessageService.DELAY_FOR_A_WHILE);
                                    iterateFrom.release();
                                    return;
                                }
                            }
                        }
                    } catch (Exception e) {
                        ScheduleMessageService.log.error("ScheduleMessageService, messageTimeUp execute error, offset = {}", Long.valueOf(j), e);
                        iterateFrom.release();
                    }
                } catch (Throwable th) {
                    iterateFrom.release();
                    throw th;
                }
            }
            iterateFrom.release();
            scheduleNextTimerTask(j, ScheduleMessageService.DELAY_FOR_A_WHILE);
        }

        public void scheduleNextTimerTask(long j, long j2) {
            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, j), j2, TimeUnit.MILLISECONDS);
        }

        private boolean syncDeliver(MessageExtBrokerInner messageExtBrokerInner, String str, long j, long j2, int i) {
            PutResultProcess deliverMessage = deliverMessage(messageExtBrokerInner, str, j, j2, i, false);
            PutMessageResult putMessageResult = deliverMessage.get();
            boolean z = putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK;
            if (z) {
                ScheduleMessageService.this.updateOffset(this.delayLevel, deliverMessage.getNextOffset());
            }
            return z;
        }

        private boolean asyncDeliver(MessageExtBrokerInner messageExtBrokerInner, String str, long j, long j2, int i) {
            Queue queue = (Queue) ScheduleMessageService.this.deliverPendingTable.get(Integer.valueOf(this.delayLevel));
            int size = queue.size();
            int scheduleAsyncDeliverMaxPendingLimit = ScheduleMessageService.this.brokerController.getMessageStoreConfig().getScheduleAsyncDeliverMaxPendingLimit();
            if (size > scheduleAsyncDeliverMaxPendingLimit) {
                ScheduleMessageService.log.warn("Asynchronous deliver triggers flow control, currentPendingNum={}, maxPendingLimit={}", Integer.valueOf(size), Integer.valueOf(scheduleAsyncDeliverMaxPendingLimit));
                return false;
            }
            PutResultProcess putResultProcess = (PutResultProcess) queue.peek();
            if (putResultProcess == null || !putResultProcess.need2Blocked()) {
                queue.add(deliverMessage(messageExtBrokerInner, str, j, j2, i, true));
                return true;
            }
            ScheduleMessageService.log.warn("Asynchronous deliver block. info={}", putResultProcess.toString());
            return false;
        }

        private PutResultProcess deliverMessage(MessageExtBrokerInner messageExtBrokerInner, String str, long j, long j2, int i, boolean z) {
            return new PutResultProcess().setTopic(messageExtBrokerInner.getTopic()).setDelayLevel(this.delayLevel).setOffset(j).setPhysicOffset(j2).setPhysicSize(i).setMsgId(str).setAutoResend(z).setFuture(ScheduleMessageService.this.brokerController.getEscapeBridge().asyncPutMessage(messageExtBrokerInner)).thenProcess();
        }

        static {
            $assertionsDisabled = !ScheduleMessageService.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/broker/schedule/ScheduleMessageService$HandlePutResultTask.class */
    public class HandlePutResultTask implements Runnable {
        private final int delayLevel;

        public HandlePutResultTask(int i) {
            this.delayLevel = i;
        }

        /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
        /* JADX WARN: Failed to find 'out' block for switch in B:6:0x002e. Please report as an issue. */
        @Override // java.lang.Runnable
        public void run() {
            LinkedBlockingQueue linkedBlockingQueue = (LinkedBlockingQueue) ScheduleMessageService.this.deliverPendingTable.get(Integer.valueOf(this.delayLevel));
            while (true) {
                PutResultProcess putResultProcess = (PutResultProcess) linkedBlockingQueue.peek();
                if (putResultProcess == null) {
                    scheduleNextTask();
                    return;
                }
                try {
                } catch (Exception e) {
                    ScheduleMessageService.log.error("HandlePutResultTask exception. info={}", putResultProcess.toString(), e);
                    putResultProcess.doResend();
                }
                switch (putResultProcess.getStatus()) {
                    case SUCCESS:
                        ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset());
                        linkedBlockingQueue.remove();
                    case RUNNING:
                        scheduleNextTask();
                        return;
                    case EXCEPTION:
                        if (!ScheduleMessageService.this.isStarted()) {
                            ScheduleMessageService.log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString());
                            return;
                        } else {
                            ScheduleMessageService.log.warn("putResultProcess error, info={}", putResultProcess.toString());
                            putResultProcess.doResend();
                        }
                    case SKIP:
                        ScheduleMessageService.log.warn("putResultProcess skip, info={}", putResultProcess.toString());
                        linkedBlockingQueue.remove();
                }
            }
        }

        private void scheduleNextTask() {
            if (ScheduleMessageService.this.isStarted()) {
                ScheduleMessageService.this.handleExecutorService.schedule(new HandlePutResultTask(this.delayLevel), ScheduleMessageService.DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);
            }
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/broker/schedule/ScheduleMessageService$ProcessStatus.class */
    public enum ProcessStatus {
        RUNNING,
        SUCCESS,
        EXCEPTION,
        SKIP
    }

    /* loaded from: input_file:org/apache/rocketmq/broker/schedule/ScheduleMessageService$PutResultProcess.class */
    public class PutResultProcess {
        private String topic;
        private long offset;
        private long physicOffset;
        private int physicSize;
        private int delayLevel;
        private String msgId;
        private CompletableFuture<PutMessageResult> future;
        private boolean autoResend = false;
        private volatile AtomicInteger resendCount = new AtomicInteger(0);
        private volatile ProcessStatus status = ProcessStatus.RUNNING;

        public PutResultProcess() {
        }

        public PutResultProcess setTopic(String str) {
            this.topic = str;
            return this;
        }

        public PutResultProcess setOffset(long j) {
            this.offset = j;
            return this;
        }

        public PutResultProcess setPhysicOffset(long j) {
            this.physicOffset = j;
            return this;
        }

        public PutResultProcess setPhysicSize(int i) {
            this.physicSize = i;
            return this;
        }

        public PutResultProcess setDelayLevel(int i) {
            this.delayLevel = i;
            return this;
        }

        public PutResultProcess setMsgId(String str) {
            this.msgId = str;
            return this;
        }

        public PutResultProcess setAutoResend(boolean z) {
            this.autoResend = z;
            return this;
        }

        public PutResultProcess setFuture(CompletableFuture<PutMessageResult> completableFuture) {
            this.future = completableFuture;
            return this;
        }

        public String getTopic() {
            return this.topic;
        }

        public long getOffset() {
            return this.offset;
        }

        public long getNextOffset() {
            return this.offset + 1;
        }

        public long getPhysicOffset() {
            return this.physicOffset;
        }

        public int getPhysicSize() {
            return this.physicSize;
        }

        public Integer getDelayLevel() {
            return Integer.valueOf(this.delayLevel);
        }

        public String getMsgId() {
            return this.msgId;
        }

        public boolean isAutoResend() {
            return this.autoResend;
        }

        public CompletableFuture<PutMessageResult> getFuture() {
            return this.future;
        }

        public AtomicInteger getResendCount() {
            return this.resendCount;
        }

        public PutResultProcess thenProcess() {
            this.future.thenAccept(this::handleResult);
            this.future.exceptionally(th -> {
                ScheduleMessageService.log.error("ScheduleMessageService put message exceptionally, info: {}", toString(), th);
                onException();
                return null;
            });
            return this;
        }

        private void handleResult(PutMessageResult putMessageResult) {
            if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                onSuccess(putMessageResult);
            } else {
                ScheduleMessageService.log.warn("ScheduleMessageService put message failed. info: {}.", putMessageResult);
                onException();
            }
        }

        public void onSuccess(PutMessageResult putMessageResult) {
            this.status = ProcessStatus.SUCCESS;
            if (!ScheduleMessageService.this.brokerController.getMessageStore().getMessageStoreConfig().isEnableScheduleMessageStats() || putMessageResult.isRemotePut()) {
                return;
            }
            ScheduleMessageService.this.brokerController.getBrokerStatsManager().incQueueGetNums("SCHEDULE_CONSUMER", "SCHEDULE_TOPIC_XXXX", Integer.valueOf(this.delayLevel - 1), putMessageResult.getAppendMessageResult().getMsgNum());
            ScheduleMessageService.this.brokerController.getBrokerStatsManager().incQueueGetSize("SCHEDULE_CONSUMER", "SCHEDULE_TOPIC_XXXX", Integer.valueOf(this.delayLevel - 1), putMessageResult.getAppendMessageResult().getWroteBytes());
            ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetNums("SCHEDULE_CONSUMER", "SCHEDULE_TOPIC_XXXX", putMessageResult.getAppendMessageResult().getMsgNum());
            ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetSize("SCHEDULE_CONSUMER", "SCHEDULE_TOPIC_XXXX", putMessageResult.getAppendMessageResult().getWroteBytes());
            Attributes build = BrokerMetricsManager.newAttributesBuilder().put(BrokerMetricsConstant.LABEL_TOPIC, "SCHEDULE_TOPIC_XXXX").put(BrokerMetricsConstant.LABEL_CONSUMER_GROUP, "SCHEDULE_CONSUMER").put(BrokerMetricsConstant.LABEL_IS_SYSTEM, true).build();
            BrokerMetricsManager.messagesOutTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), build);
            BrokerMetricsManager.throughputOutTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), build);
            ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutNums(this.topic, putMessageResult.getAppendMessageResult().getMsgNum(), 1);
            ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutSize(this.topic, putMessageResult.getAppendMessageResult().getWroteBytes());
            ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(this.topic, putMessageResult.getAppendMessageResult().getMsgNum());
            Attributes build2 = BrokerMetricsManager.newAttributesBuilder().put(BrokerMetricsConstant.LABEL_TOPIC, this.topic).put(BrokerMetricsConstant.LABEL_MESSAGE_TYPE, TopicMessageType.DELAY.getMetricsValue()).put(BrokerMetricsConstant.LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(this.topic)).build();
            BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), build2);
            BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), build2);
            BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), build2);
        }

        public void onException() {
            ScheduleMessageService.log.warn("ScheduleMessageService onException, info: {}", toString());
            if (this.autoResend) {
                this.status = ProcessStatus.EXCEPTION;
            } else {
                this.status = ProcessStatus.SKIP;
            }
        }

        public ProcessStatus getStatus() {
            return this.status;
        }

        public PutMessageResult get() {
            try {
                return this.future.get();
            } catch (InterruptedException | ExecutionException e) {
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, (AppendMessageResult) null);
            }
        }

        public void doResend() {
            ScheduleMessageService.log.info("Resend message, info: {}", toString());
            try {
                Thread.sleep(Math.min(this.resendCount.incrementAndGet() * 100, 60000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                MessageExt lookMessageByOffset = ScheduleMessageService.this.brokerController.getMessageStore().lookMessageByOffset(this.physicOffset, this.physicSize);
                if (lookMessageByOffset == null) {
                    ScheduleMessageService.log.warn("ScheduleMessageService resend not found message. info: {}", toString());
                    this.status = need2Skip() ? ProcessStatus.SKIP : ProcessStatus.EXCEPTION;
                    return;
                }
                PutMessageResult putMessage = ScheduleMessageService.this.brokerController.getEscapeBridge().putMessage(ScheduleMessageService.this.messageTimeUp(lookMessageByOffset));
                handleResult(putMessage);
                if (putMessage != null && putMessage.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                    ScheduleMessageService.log.info("Resend message success, info: {}", toString());
                }
            } catch (Exception e2) {
                this.status = ProcessStatus.EXCEPTION;
                ScheduleMessageService.log.error("Resend message error, info: {}", toString(), e2);
            }
        }

        public boolean need2Blocked() {
            return this.resendCount.get() > ScheduleMessageService.this.brokerController.getMessageStore().getMessageStoreConfig().getScheduleAsyncDeliverMaxResendNum2Blocked();
        }

        public boolean need2Skip() {
            return this.resendCount.get() > ScheduleMessageService.this.brokerController.getMessageStore().getMessageStoreConfig().getScheduleAsyncDeliverMaxResendNum2Blocked() * 2;
        }

        public String toString() {
            return "PutResultProcess{topic='" + this.topic + "', offset=" + this.offset + ", physicOffset=" + this.physicOffset + ", physicSize=" + this.physicSize + ", delayLevel=" + this.delayLevel + ", msgId='" + this.msgId + "', autoResend=" + this.autoResend + ", resendCount=" + this.resendCount + ", status=" + this.status + '}';
        }
    }

    public ScheduleMessageService(BrokerController brokerController) {
        this.enableAsyncDeliver = false;
        this.brokerController = brokerController;
        this.enableAsyncDeliver = brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
        this.scheduledPersistService = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig()));
    }

    public static int queueId2DelayLevel(int i) {
        return i + 1;
    }

    public static int delayLevel2QueueId(int i) {
        return i - 1;
    }

    public void buildRunningStats(HashMap<String, String> hashMap) {
        for (Map.Entry<Integer, Long> entry : this.offsetTable.entrySet()) {
            hashMap.put(String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), entry.getKey()), String.format("%d,%d", Long.valueOf(entry.getValue().longValue()), Long.valueOf(this.brokerController.getMessageStore().getMaxOffsetInQueue("SCHEDULE_TOPIC_XXXX", delayLevel2QueueId(entry.getKey().intValue())))));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateOffset(int i, long j) {
        this.offsetTable.put(Integer.valueOf(i), Long.valueOf(j));
        if (this.versionChangeCounter.incrementAndGet() % this.brokerController.getBrokerConfig().getDelayOffsetUpdateVersionStep() == 0) {
            this.dataVersion.nextVersion(this.brokerController.getMessageStore() != null ? this.brokerController.getMessageStore().getStateMachineVersion() : 0L);
        }
    }

    public long computeDeliverTimestamp(int i, long j) {
        Long l = this.delayLevelTable.get(Integer.valueOf(i));
        return l != null ? l.longValue() + j : j + FIRST_DELAY_TIME;
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            load();
            this.deliverExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
            if (this.enableAsyncDeliver) {
                this.handleExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
            }
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer key = entry.getKey();
                Long value = entry.getValue();
                Long l = this.offsetTable.get(key);
                if (null == l) {
                    l = 0L;
                }
                if (value != null) {
                    if (this.enableAsyncDeliver) {
                        this.handleExecutorService.schedule(new HandlePutResultTask(key.intValue()), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                    }
                    this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(key.intValue(), l.longValue()), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                }
            }
            this.scheduledPersistService.scheduleAtFixedRate(() -> {
                try {
                    persist();
                } catch (Throwable th) {
                    log.error("scheduleAtFixedRate flush exception", th);
                }
            }, DELAY_FOR_A_PERIOD, this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
        }
    }

    public void shutdown() {
        stop();
        ThreadUtils.shutdown(this.scheduledPersistService);
    }

    public boolean stop() {
        if (!this.started.compareAndSet(true, false) || null == this.deliverExecutorService) {
            return true;
        }
        this.deliverExecutorService.shutdown();
        try {
            this.deliverExecutorService.awaitTermination(WAIT_FOR_SHUTDOWN, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("deliverExecutorService awaitTermination error", e);
        }
        if (this.handleExecutorService != null) {
            this.handleExecutorService.shutdown();
            try {
                this.handleExecutorService.awaitTermination(WAIT_FOR_SHUTDOWN, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e2) {
                log.error("handleExecutorService awaitTermination error", e2);
            }
        }
        for (int i = 1; i <= this.deliverPendingTable.size(); i++) {
            log.warn("deliverPendingTable level: {}, size: {}", Integer.valueOf(i), Integer.valueOf(this.deliverPendingTable.get(Integer.valueOf(i)).size()));
        }
        persist();
        return true;
    }

    public boolean isStarted() {
        return this.started.get();
    }

    public int getMaxDelayLevel() {
        return this.maxDelayLevel;
    }

    public DataVersion getDataVersion() {
        return this.dataVersion;
    }

    public void setDataVersion(DataVersion dataVersion) {
        this.dataVersion = dataVersion;
    }

    public String encode() {
        return encode(false);
    }

    public boolean load() {
        return (super.load() && parseDelayLevel()) && correctDelayOffset();
    }

    public boolean loadWhenSyncDelayOffset() {
        return super.load() && parseDelayLevel();
    }

    public boolean correctDelayOffset() {
        try {
            Iterator<Integer> it = this.delayLevelTable.keySet().iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                ConsumeQueueInterface findOrCreateConsumeQueue = this.brokerController.getMessageStore().getQueueStore().findOrCreateConsumeQueue("SCHEDULE_TOPIC_XXXX", delayLevel2QueueId(intValue));
                Long l = this.offsetTable.get(Integer.valueOf(intValue));
                if (l != null && findOrCreateConsumeQueue != null) {
                    long longValue = l.longValue();
                    long minOffsetInQueue = findOrCreateConsumeQueue.getMinOffsetInQueue();
                    long maxOffsetInQueue = findOrCreateConsumeQueue.getMaxOffsetInQueue();
                    if (l.longValue() < minOffsetInQueue) {
                        longValue = minOffsetInQueue;
                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}", new Object[]{l, Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue), Integer.valueOf(findOrCreateConsumeQueue.getQueueId())});
                    }
                    if (l.longValue() > maxOffsetInQueue) {
                        longValue = maxOffsetInQueue;
                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}", new Object[]{l, Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue), Integer.valueOf(findOrCreateConsumeQueue.getQueueId())});
                    }
                    if (longValue != l.longValue()) {
                        log.error("correct delay offset [ delayLevel {} ] from {} to {}", new Object[]{Integer.valueOf(intValue), l, Long.valueOf(longValue)});
                        this.offsetTable.put(Integer.valueOf(intValue), Long.valueOf(longValue));
                    }
                }
            }
            return true;
        } catch (Exception e) {
            log.error("correctDelayOffset exception", e);
            return false;
        }
    }

    public String configFilePath() {
        return StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController.getMessageStore().getMessageStoreConfig().getStorePathRootDir());
    }

    public void decode(String str) {
        DelayOffsetSerializeWrapper delayOffsetSerializeWrapper;
        if (str == null || (delayOffsetSerializeWrapper = (DelayOffsetSerializeWrapper) DelayOffsetSerializeWrapper.fromJson(str, DelayOffsetSerializeWrapper.class)) == null) {
            return;
        }
        this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());
        if (delayOffsetSerializeWrapper.getDataVersion() != null) {
            this.dataVersion.assignNewOne(delayOffsetSerializeWrapper.getDataVersion());
        }
    }

    public String encode(boolean z) {
        DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper();
        delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
        delayOffsetSerializeWrapper.setDataVersion(this.dataVersion);
        return delayOffsetSerializeWrapper.toJson(z);
    }

    public boolean parseDelayLevel() {
        HashMap hashMap = new HashMap();
        hashMap.put("s", Long.valueOf(FIRST_DELAY_TIME));
        hashMap.put("m", 60000L);
        hashMap.put("h", 3600000L);
        hashMap.put(TransactionalMessageUtil.REMOVE_TAG, 86400000L);
        String messageDelayLevel = this.brokerController.getMessageStoreConfig().getMessageDelayLevel();
        try {
            String[] split = messageDelayLevel.split(" ");
            for (int i = 0; i < split.length; i++) {
                String str = split[i];
                Long l = (Long) hashMap.get(str.substring(str.length() - 1));
                int i2 = i + 1;
                if (i2 > this.maxDelayLevel) {
                    this.maxDelayLevel = i2;
                }
                this.delayLevelTable.put(Integer.valueOf(i2), Long.valueOf(l.longValue() * Long.parseLong(str.substring(0, str.length() - 1))));
                if (this.enableAsyncDeliver) {
                    this.deliverPendingTable.put(Integer.valueOf(i2), new LinkedBlockingQueue<>());
                }
            }
            return true;
        } catch (Exception e) {
            log.error("parse message delay level failed. messageDelayLevel = {}", messageDelayLevel, e);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageExtBrokerInner messageTimeUp(MessageExt messageExt) {
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setBody(messageExt.getBody());
        messageExtBrokerInner.setFlag(messageExt.getFlag());
        MessageAccessor.setProperties(messageExtBrokerInner, messageExt.getProperties());
        messageExtBrokerInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(messageExtBrokerInner.getSysFlag()), messageExtBrokerInner.getTags()));
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExt.getProperties()));
        messageExtBrokerInner.setSysFlag(messageExt.getSysFlag());
        messageExtBrokerInner.setBornTimestamp(messageExt.getBornTimestamp());
        messageExtBrokerInner.setBornHost(messageExt.getBornHost());
        messageExtBrokerInner.setStoreHost(messageExt.getStoreHost());
        messageExtBrokerInner.setReconsumeTimes(messageExt.getReconsumeTimes());
        messageExtBrokerInner.setWaitStoreMsgOK(false);
        MessageAccessor.clearProperty(messageExtBrokerInner, "DELAY");
        MessageAccessor.clearProperty(messageExtBrokerInner, "TIMER_DELIVER_MS");
        MessageAccessor.clearProperty(messageExtBrokerInner, "TIMER_DELAY_SEC");
        messageExtBrokerInner.setTopic(messageExtBrokerInner.getProperty("REAL_TOPIC"));
        messageExtBrokerInner.setQueueId(Integer.parseInt(messageExtBrokerInner.getProperty("REAL_QID")));
        return messageExtBrokerInner;
    }

    public ConcurrentMap<Integer, Long> getOffsetTable() {
        return this.offsetTable;
    }
}
