package org.jruby.ext.thread;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyMarshal;
import org.jruby.RubyNumeric;
import org.jruby.RubyObject;
import org.jruby.RubyThread;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.exceptions.RaiseException;
import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.Visibility;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.marshal.DataType;

@JRubyClass(name = {"Queue"})
/* loaded from: input_file:org/jruby/ext/thread/Queue.class */
public class Queue extends RubyObject implements DataType {
    protected volatile boolean closed;
    protected volatile int capacity;
    protected final AtomicInteger count;
    transient Node head;
    protected transient Node last;
    protected final ReentrantLock takeLock;
    protected final Condition notEmpty;
    protected final ReentrantLock putLock;
    protected final Condition notFull;
    private static final RubyThread.Task<Queue, IRubyObject> BLOCKING_POP_TASK = new RubyThread.Task<Queue, IRubyObject>() { // from class: org.jruby.ext.thread.Queue.2
        @Override // org.jruby.RubyThread.Task
        public IRubyObject run(ThreadContext threadContext, Queue queue) throws InterruptedException {
            return queue.takeInternal(threadContext);
        }

        @Override // org.jruby.RubyThread.Task, org.jruby.RubyThread.Unblocker
        public void wakeup(RubyThread rubyThread, Queue queue) {
            rubyThread.getNativeThread().interrupt();
        }
    };
    private static final RubyThread.Task<Queue, IRubyObject> NONBLOCKING_POP_TASK = new RubyThread.Task<Queue, IRubyObject>() { // from class: org.jruby.ext.thread.Queue.3
        @Override // org.jruby.RubyThread.Task
        public IRubyObject run(ThreadContext threadContext, Queue queue) throws InterruptedException {
            IRubyObject pollInternal = queue.pollInternal();
            if (pollInternal == null) {
                throw threadContext.runtime.newThreadError("queue empty");
            }
            return pollInternal;
        }

        @Override // org.jruby.RubyThread.Task, org.jruby.RubyThread.Unblocker
        public void wakeup(RubyThread rubyThread, Queue queue) {
            rubyThread.getNativeThread().interrupt();
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jruby/ext/thread/Queue$Node.class */
    public static class Node {
        IRubyObject item;
        Node next;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Node(IRubyObject iRubyObject) {
            this.item = iRubyObject;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void signalNotEmpty() {
        ReentrantLock reentrantLock = this.takeLock;
        reentrantLock.lock();
        try {
            this.notEmpty.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

    protected void signalNotFull() {
        ReentrantLock reentrantLock = this.putLock;
        reentrantLock.lock();
        try {
            this.notFull.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueue(Node node) {
        this.last.next = node;
        this.last = node;
    }

    protected IRubyObject dequeue() {
        Node node = this.head;
        Node node2 = node.next;
        node.next = node;
        this.head = node2;
        IRubyObject iRubyObject = node2.item;
        node2.item = null;
        return iRubyObject;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fullyLock() {
        this.putLock.lock();
        this.takeLock.lock();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fullyUnlock() {
        this.takeLock.unlock();
        this.putLock.unlock();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initializedCheck() {
        if (this.capacity == 0) {
            throw getRuntime().newTypeError(this + " not initialized");
        }
    }

    public Queue(Ruby ruby, RubyClass rubyClass) {
        super(ruby, rubyClass);
        this.closed = false;
        this.count = new AtomicInteger();
        this.takeLock = new ReentrantLock();
        this.notEmpty = this.takeLock.newCondition();
        this.putLock = new ReentrantLock();
        this.notFull = this.putLock.newCondition();
        Node node = new Node(null);
        this.head = node;
        this.last = node;
    }

    public static void setup(Ruby ruby) {
        RubyClass defineClassUnder = ruby.getThread().defineClassUnder("Queue", ruby.getObject(), new ObjectAllocator() { // from class: org.jruby.ext.thread.Queue.1
            @Override // org.jruby.runtime.ObjectAllocator
            public IRubyObject allocate(Ruby ruby2, RubyClass rubyClass) {
                return new Queue(ruby2, rubyClass);
            }
        });
        defineClassUnder.undefineMethod("initialize_copy");
        defineClassUnder.setReifiedClass(Queue.class);
        defineClassUnder.defineAnnotatedMethods(Queue.class);
        ruby.getObject().setConstant("Queue", defineClassUnder);
        ruby.getObject().setConstant("ClosedQueueError", defineClassUnder.defineClassUnder("ClosedQueueError", ruby.getStopIteration(), ruby.getStopIteration().getAllocator()));
    }

    @Override // org.jruby.RubyBasicObject
    @JRubyMethod(visibility = Visibility.PRIVATE)
    public IRubyObject initialize(ThreadContext threadContext) {
        this.capacity = Integer.MAX_VALUE;
        return this;
    }

    @JRubyMethod
    public IRubyObject clear(ThreadContext threadContext) {
        initializedCheck();
        try {
            clearInternal();
            return this;
        } catch (InterruptedException e) {
            throw createInterruptedError(threadContext, "clear");
        }
    }

    /* JADX WARN: Finally extract failed */
    protected void clearInternal() throws InterruptedException {
        ReentrantLock reentrantLock = this.putLock;
        ReentrantLock reentrantLock2 = this.takeLock;
        reentrantLock.lockInterruptibly();
        try {
            reentrantLock2.lockInterruptibly();
            try {
                Node node = this.head;
                while (true) {
                    Node node2 = node.next;
                    if (node2 == null) {
                        break;
                    }
                    node.next = node;
                    node2.item = null;
                    node = node2;
                }
                this.head = this.last;
                if (this.count.getAndSet(0) == this.capacity) {
                    this.notFull.signal();
                }
                reentrantLock2.unlock();
            } catch (Throwable th) {
                reentrantLock2.unlock();
                throw th;
            }
        } finally {
            reentrantLock.unlock();
        }
    }

    @JRubyMethod(name = {"empty?"})
    public RubyBoolean empty_p(ThreadContext threadContext) {
        initializedCheck();
        return threadContext.runtime.newBoolean(this.count.get() == 0);
    }

    @JRubyMethod(name = {"length", "size"})
    public RubyNumeric length(ThreadContext threadContext) {
        initializedCheck();
        return RubyNumeric.int2fix(threadContext.runtime, this.count.get());
    }

    @JRubyMethod
    public RubyNumeric num_waiting(ThreadContext threadContext) {
        initializedCheck();
        ReentrantLock reentrantLock = this.takeLock;
        try {
            reentrantLock.lockInterruptibly();
            try {
                RubyFixnum newFixnum = threadContext.runtime.newFixnum(reentrantLock.getWaitQueueLength(this.notEmpty));
                reentrantLock.unlock();
                return newFixnum;
            } catch (Throwable th) {
                reentrantLock.unlock();
                throw th;
            }
        } catch (InterruptedException e) {
            throw createInterruptedError(threadContext, "num_waiting");
        }
    }

    @JRubyMethod(name = {"pop", "deq", "shift"})
    public IRubyObject pop(ThreadContext threadContext) {
        initializedCheck();
        try {
            return (IRubyObject) threadContext.getThread().executeTask(threadContext, this, BLOCKING_POP_TASK);
        } catch (InterruptedException e) {
            throw createInterruptedError(threadContext, "pop");
        }
    }

    @JRubyMethod(name = {"pop", "deq", "shift"})
    public IRubyObject pop(ThreadContext threadContext, IRubyObject iRubyObject) {
        initializedCheck();
        try {
            return (IRubyObject) threadContext.getThread().executeTask(threadContext, this, !iRubyObject.isTrue() ? BLOCKING_POP_TASK : NONBLOCKING_POP_TASK);
        } catch (InterruptedException e) {
            throw createInterruptedError(threadContext, "pop");
        }
    }

    @JRubyMethod(name = {"push", "<<", "enq"})
    public IRubyObject push(ThreadContext threadContext, IRubyObject iRubyObject) {
        initializedCheck();
        try {
            putInternal(threadContext, iRubyObject);
            return this;
        } catch (InterruptedException e) {
            throw createInterruptedError(threadContext, "push");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void putInternal(ThreadContext threadContext, IRubyObject iRubyObject) throws InterruptedException {
        boolean z;
        if (iRubyObject == null) {
            throw new NullPointerException();
        }
        Node node = new Node(iRubyObject);
        ReentrantLock reentrantLock = this.putLock;
        AtomicInteger atomicInteger = this.count;
        reentrantLock.lockInterruptibly();
        while (true) {
            try {
                z = this.closed;
                if (z || atomicInteger.get() < this.capacity) {
                    break;
                } else {
                    this.notFull.await();
                }
            } finally {
                reentrantLock.unlock();
            }
        }
        if (z) {
            this.notFull.signal();
            raiseClosedError(threadContext);
        }
        enqueue(node);
        int andIncrement = atomicInteger.getAndIncrement();
        if (andIncrement + 1 < this.capacity) {
            this.notFull.signal();
        }
        if (andIncrement == 0) {
            signalNotEmpty();
        }
    }

    @JRubyMethod
    public IRubyObject marshal_dump(ThreadContext threadContext) {
        return RubyMarshal.undumpable(threadContext, this);
    }

    @JRubyMethod
    public IRubyObject close(ThreadContext threadContext) {
        initializedCheck();
        try {
            closeInternal();
            return this;
        } catch (InterruptedException e) {
            throw createInterruptedError(threadContext, "close");
        }
    }

    protected void closeInternal() throws InterruptedException {
        ReentrantLock reentrantLock = this.putLock;
        ReentrantLock reentrantLock2 = this.takeLock;
        AtomicInteger atomicInteger = this.count;
        reentrantLock.lockInterruptibly();
        try {
            reentrantLock2.lockInterruptibly();
            try {
                if (this.closed) {
                    reentrantLock.unlock();
                    return;
                }
                this.closed = true;
                int i = atomicInteger.get();
                if (i >= this.capacity) {
                    this.notFull.signal();
                } else if (i == 0) {
                    this.notEmpty.signal();
                }
                reentrantLock2.unlock();
            } finally {
                reentrantLock2.unlock();
            }
        } finally {
            reentrantLock.unlock();
        }
    }

    @JRubyMethod(name = {"closed?"})
    public IRubyObject closed_p(ThreadContext threadContext) {
        initializedCheck();
        return threadContext.runtime.newBoolean(this.closed);
    }

    public synchronized void shutdown() throws InterruptedException {
        closeInternal();
    }

    public boolean isShutdown() {
        return this.closed;
    }

    public synchronized void checkShutdown() {
        if (isShutdown()) {
            Ruby runtime = getRuntime();
            throw RaiseException.from(runtime, runtime.getThreadError(), "queue shut down");
        }
    }

    protected long java_length() {
        return this.count.get();
    }

    protected IRubyObject takeInternal(ThreadContext threadContext) throws InterruptedException {
        boolean z;
        IRubyObject iRubyObject;
        int i = -1;
        AtomicInteger atomicInteger = this.count;
        ReentrantLock reentrantLock = this.takeLock;
        boolean z2 = false;
        reentrantLock.lockInterruptibly();
        while (true) {
            try {
                z = this.closed;
                if (z || atomicInteger.get() != 0) {
                    break;
                }
                this.notEmpty.await();
            } finally {
                reentrantLock.unlock();
            }
        }
        boolean z3 = (z && atomicInteger.get() == 0) ? false : true;
        if (z3) {
            iRubyObject = dequeue();
            i = atomicInteger.getAndDecrement();
        } else {
            iRubyObject = threadContext.nil;
        }
        if (i > 1 || z) {
            this.notEmpty.signal();
        }
        if (z3) {
            z2 = i == this.capacity;
        }
        if (z2) {
            signalNotFull();
        }
        return iRubyObject;
    }

    public IRubyObject pollInternal() throws InterruptedException {
        AtomicInteger atomicInteger = this.count;
        if (atomicInteger.get() == 0) {
            return null;
        }
        IRubyObject iRubyObject = null;
        boolean z = false;
        ReentrantLock reentrantLock = this.takeLock;
        reentrantLock.lockInterruptibly();
        try {
            if (atomicInteger.get() > 0) {
                iRubyObject = dequeue();
                int andDecrement = atomicInteger.getAndDecrement();
                if (andDecrement > 1) {
                    this.notEmpty.signal();
                }
                z = andDecrement == this.capacity;
            }
            if (z) {
                signalNotFull();
            }
            return iRubyObject;
        } finally {
            reentrantLock.unlock();
        }
    }

    public IRubyObject raiseClosedError(ThreadContext threadContext) {
        throw threadContext.runtime.newRaiseException(threadContext.runtime.getClass("ClosedQueueError"), "queue closed");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RaiseException createInterruptedError(ThreadContext threadContext, String str) {
        return threadContext.runtime.newThreadError("interrupted in " + getMetaClass().getName() + "#" + str);
    }
}
