/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.io.Closeable;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnAckedMessageTracker
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class);
    private ConcurrentOpenHashSet<MessageIdImpl> currentSet;
    private ConcurrentOpenHashSet<MessageIdImpl> oldOpenSet;
    private final ReentrantReadWriteLock readWriteLock;
    private final Lock readLock;
    private final Lock writeLock;
    private Timeout timeout;
    public static final UnAckedMessageTrackerDisabled UNACKED_MESSAGE_TRACKER_DISABLED = new UnAckedMessageTrackerDisabled();

    public UnAckedMessageTracker() {
        this.readWriteLock = null;
        this.readLock = null;
        this.writeLock = null;
    }

    public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis) {
        this.currentSet = new ConcurrentOpenHashSet();
        this.oldOpenSet = new ConcurrentOpenHashSet();
        this.readWriteLock = new ReentrantReadWriteLock();
        this.readLock = this.readWriteLock.readLock();
        this.writeLock = this.readWriteLock.writeLock();
        this.start(client, consumerBase, ackTimeoutMillis);
    }

    public void start(final PulsarClientImpl client, final ConsumerBase consumerBase, final long ackTimeoutMillis) {
        this.stop();
        this.timeout = client.timer().newTimeout(new TimerTask(){

            @Override
            public void run(Timeout t) throws Exception {
                if (UnAckedMessageTracker.this.isAckTimeout()) {
                    log.warn("[{}] {} messages have timed-out", (Object)consumerBase, (Object)UnAckedMessageTracker.this.oldOpenSet.size());
                    HashSet<MessageIdImpl> messageIds = new HashSet<MessageIdImpl>();
                    UnAckedMessageTracker.this.oldOpenSet.forEach(messageIds::add);
                    UnAckedMessageTracker.this.oldOpenSet.clear();
                    consumerBase.redeliverUnacknowledgedMessages(messageIds);
                }
                UnAckedMessageTracker.this.toggle();
                UnAckedMessageTracker.this.timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS);
            }
        }, ackTimeoutMillis, TimeUnit.MILLISECONDS);
    }

    void toggle() {
        this.writeLock.lock();
        try {
            ConcurrentOpenHashSet<MessageIdImpl> temp = this.currentSet;
            this.currentSet = this.oldOpenSet;
            this.oldOpenSet = temp;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public void clear() {
        this.readLock.lock();
        try {
            this.currentSet.clear();
            this.oldOpenSet.clear();
        }
        finally {
            this.readLock.unlock();
        }
    }

    public boolean add(MessageIdImpl m) {
        this.readLock.lock();
        try {
            this.oldOpenSet.remove(m);
            boolean bl = this.currentSet.add(m);
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    boolean isEmpty() {
        this.readLock.lock();
        try {
            boolean bl = this.currentSet.isEmpty() && this.oldOpenSet.isEmpty();
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    public boolean remove(MessageIdImpl m) {
        this.readLock.lock();
        try {
            boolean bl = this.currentSet.remove(m) || this.oldOpenSet.remove(m);
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    long size() {
        this.readLock.lock();
        try {
            long l = this.currentSet.size() + this.oldOpenSet.size();
            return l;
        }
        finally {
            this.readLock.unlock();
        }
    }

    private boolean isAckTimeout() {
        this.readLock.lock();
        try {
            boolean bl = !this.oldOpenSet.isEmpty();
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int removeMessagesTill(MessageIdImpl msgId) {
        this.readLock.lock();
        try {
            int currentSetRemovedMsgCount = this.currentSet.removeIf(m -> (m.getLedgerId() < msgId.getLedgerId() || m.getLedgerId() == msgId.getLedgerId() && m.getEntryId() <= msgId.getEntryId()) && m.getPartitionIndex() == msgId.getPartitionIndex());
            int oldSetRemovedMsgCount = this.oldOpenSet.removeIf(m -> (m.getLedgerId() < msgId.getLedgerId() || m.getLedgerId() == msgId.getLedgerId() && m.getEntryId() <= msgId.getEntryId()) && m.getPartitionIndex() == msgId.getPartitionIndex());
            int n = currentSetRemovedMsgCount + oldSetRemovedMsgCount;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    private void stop() {
        this.writeLock.lock();
        try {
            if (this.timeout != null && !this.timeout.isCancelled()) {
                this.timeout.cancel();
            }
            this.clear();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public void close() {
        this.stop();
    }

    private static class UnAckedMessageTrackerDisabled
    extends UnAckedMessageTracker {
        private UnAckedMessageTrackerDisabled() {
        }

        @Override
        public void clear() {
        }

        @Override
        public boolean add(MessageIdImpl m) {
            return true;
        }

        @Override
        public boolean remove(MessageIdImpl m) {
            return true;
        }

        @Override
        public int removeMessagesTill(MessageIdImpl msgId) {
            return 0;
        }

        @Override
        public void close() {
        }
    }
}

