package org.wso2.carbon.event.receiver.core.internal.management;

import java.io.Serializable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.event.receiver.core.internal.util.EventReceiverUtil;

/* loaded from: input_file:org/wso2/carbon/event/receiver/core/internal/management/BlockingEventQueue.class */
public class BlockingEventQueue implements Serializable {
    private static final Log log = LogFactory.getLog(BlockingEventQueue.class);
    private int maxSizeInBytes;
    private BlockingQueue<WrappedEvent> queue;
    private final ReentrantLock takeLock = new ReentrantLock();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = this.putLock.newCondition();
    private final Condition notEmpty = this.takeLock.newCondition();
    private AtomicInteger currentSize = new AtomicInteger(0);
    private int currentEventSize = 0;

    /* loaded from: input_file:org/wso2/carbon/event/receiver/core/internal/management/BlockingEventQueue$WrappedEvent.class */
    private class WrappedEvent implements Serializable {
        private int size;
        private Event event;

        public WrappedEvent(int i, Event event) {
            this.size = i;
            this.event = event;
        }

        public int getSize() {
            return this.size;
        }

        public Event getEvent() {
            return this.event;
        }
    }

    public BlockingEventQueue(int i, int i2) {
        this.maxSizeInBytes = i * 1000000;
        this.queue = new LinkedBlockingQueue(i2);
    }

    public void put(Event event, ReentrantLock reentrantLock) throws InterruptedException {
        ReentrantLock reentrantLock2 = this.putLock;
        AtomicInteger atomicInteger = this.currentSize;
        reentrantLock2.lockInterruptibly();
        try {
            this.currentEventSize = EventReceiverUtil.getSize(event) + 4;
            while (atomicInteger.get() >= this.maxSizeInBytes) {
                this.notFull.await(1L, TimeUnit.SECONDS);
            }
            reentrantLock.lock();
            while (!this.queue.offer(new WrappedEvent(this.currentEventSize, event))) {
                reentrantLock.unlock();
                Thread.sleep(1000L);
                reentrantLock.lock();
            }
            reentrantLock.unlock();
            int andAdd = atomicInteger.getAndAdd(this.currentEventSize);
            reentrantLock2.unlock();
            if (andAdd == 0) {
                signalNotEmpty();
            }
            if (log.isDebugEnabled()) {
                log.debug("Current queue size in bytes : " + atomicInteger + ", remaining capacity : " + this.queue.remainingCapacity());
            }
        } catch (Throwable th) {
            reentrantLock2.unlock();
            throw th;
        }
    }

    public Event take() throws InterruptedException {
        ReentrantLock reentrantLock = this.takeLock;
        AtomicInteger atomicInteger = this.currentSize;
        reentrantLock.lockInterruptibly();
        while (atomicInteger.get() == 0) {
            try {
                this.notEmpty.await(1L, TimeUnit.SECONDS);
            } catch (Throwable th) {
                reentrantLock.unlock();
                throw th;
            }
        }
        WrappedEvent take = this.queue.take();
        int andAdd = atomicInteger.getAndAdd(-take.getSize());
        reentrantLock.unlock();
        if (andAdd >= this.maxSizeInBytes) {
            signalNotFull();
        }
        return take.getEvent();
    }

    public Event poll() {
        AtomicInteger atomicInteger = this.currentSize;
        if (atomicInteger.get() == 0) {
            return null;
        }
        WrappedEvent wrappedEvent = null;
        int i = -1;
        ReentrantLock reentrantLock = this.takeLock;
        reentrantLock.lock();
        try {
            if (atomicInteger.get() > 0) {
                wrappedEvent = this.queue.poll();
                if (wrappedEvent != null) {
                    i = atomicInteger.getAndAdd(-wrappedEvent.getSize());
                    if (atomicInteger.get() > 0) {
                        this.notEmpty.signal();
                    }
                }
            }
            if (wrappedEvent != null && i >= this.maxSizeInBytes) {
                signalNotFull();
            }
            if (wrappedEvent == null) {
                return null;
            }
            return wrappedEvent.getEvent();
        } finally {
            reentrantLock.unlock();
        }
    }

    public Event peek() {
        WrappedEvent peek = this.queue.peek();
        if (peek != null) {
            return peek.getEvent();
        }
        return null;
    }

    private void signalNotEmpty() {
        ReentrantLock reentrantLock = this.takeLock;
        reentrantLock.lock();
        try {
            this.notEmpty.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

    private void signalNotFull() {
        ReentrantLock reentrantLock = this.putLock;
        reentrantLock.lock();
        try {
            this.notFull.signal();
        } finally {
            reentrantLock.unlock();
        }
    }
}
