/*
 * Decompiled with CFR 0.152.
 */
package backtype.storm.utils;

import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.MutableObject;
import backtype.storm.utils.disruptor.AbstractSequencerExt;
import backtype.storm.utils.disruptor.RingBuffer;
import com.lmax.disruptor.AlertException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisruptorQueueImpl
extends DisruptorQueue {
    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueImpl.class);
    static boolean useSleep = true;
    private static final Object FLUSH_CACHE = new Object();
    private static final Object INTERRUPT = new Object();
    private static final String PREFIX = "disruptor-";
    private final String _queueName;
    private final RingBuffer<MutableObject> _buffer;
    private final Sequence _consumer;
    private final SequenceBarrier _barrier;
    private final HashMap<String, Object> state = new HashMap(4);
    private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
    private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.cacheLock.readLock();
    private final Lock writeLock = this.cacheLock.writeLock();

    public static void setUseSleep(boolean useSleep) {
        AbstractSequencerExt.setWaitSleep(useSleep);
    }

    public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
        this._queueName = PREFIX + queueName;
        this._buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
        this._consumer = new Sequence();
        this._barrier = this._buffer.newBarrier(new Sequence[0]);
        this._buffer.addGatingSequences(this._consumer);
    }

    @Override
    public String getName() {
        return this._queueName;
    }

    @Override
    public void consumeBatch(EventHandler<Object> handler) {
        this.consumeBatchToCursor(this._barrier.getCursor(), handler);
    }

    @Override
    public void haltWithInterrupt() {
        this.publish(INTERRUPT);
    }

    @Override
    public Object poll() {
        long nextSequence = this._consumer.get() + 1L;
        if (nextSequence <= this._barrier.getCursor()) {
            MutableObject mo = this._buffer.get(nextSequence);
            this._consumer.set(nextSequence);
            Object ret = mo.o;
            mo.setObject(null);
            return ret;
        }
        return null;
    }

    @Override
    public Object take() {
        long nextSequence = this._consumer.get() + 1L;
        try {
            this._barrier.waitFor(nextSequence);
        }
        catch (AlertException e) {
            LOG.error(e.getMessage(), (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (InterruptedException e) {
            LOG.error("InterruptedException " + e.getCause());
            return null;
        }
        catch (TimeoutException e) {
            return null;
        }
        MutableObject mo = this._buffer.get(nextSequence);
        this._consumer.set(nextSequence);
        Object ret = mo.o;
        mo.setObject(null);
        return ret;
    }

    @Override
    public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
        try {
            long nextSequence = this._consumer.get() + 1L;
            long availableSequence = this._barrier.waitFor(nextSequence);
            if (availableSequence >= nextSequence) {
                this.consumeBatchToCursor(availableSequence, handler);
            }
        }
        catch (AlertException e) {
            LOG.error(e.getMessage(), (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (InterruptedException e) {
            LOG.error("InterruptedException " + e.getCause());
            return;
        }
        catch (TimeoutException e) {
            return;
        }
    }

    public void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
        for (long curr = this._consumer.get() + 1L; curr <= cursor; ++curr) {
            try {
                MutableObject mo = this._buffer.get(curr);
                Object o = mo.o;
                mo.setObject(null);
                handler.onEvent(o, curr, curr == cursor);
                continue;
            }
            catch (InterruptedException e) {
                LOG.error(e.getMessage(), (Throwable)e);
                return;
            }
            catch (Exception e) {
                LOG.error(e.getMessage(), (Throwable)e);
                throw new RuntimeException(e);
            }
        }
        this._consumer.set(cursor);
    }

    @Override
    public void publish(Object obj) {
        try {
            this.publish(obj, true);
        }
        catch (InsufficientCapacityException ex) {
            throw new RuntimeException("This code should be unreachable!");
        }
    }

    public void tryPublish(Object obj) throws InsufficientCapacityException {
        this.publish(obj, false);
    }

    @Override
    public void publish(Object obj, boolean block) throws InsufficientCapacityException {
        this.publishDirect(obj, block);
    }

    protected void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
        long id = block ? this._buffer.next() : this._buffer.tryNext(1);
        MutableObject m = this._buffer.get(id);
        m.setObject(obj);
        this._buffer.publish(id);
    }

    @Override
    public void clear() {
        while (this.population() != 0L) {
            this.poll();
        }
    }

    @Override
    public long population() {
        return this.writePos() - this.readPos();
    }

    @Override
    public long capacity() {
        return this._buffer.getBufferSize();
    }

    @Override
    public long writePos() {
        return this._buffer.getCursor();
    }

    @Override
    public long readPos() {
        return this._consumer.get();
    }

    @Override
    public float pctFull() {
        return 1.0f * (float)this.population() / (float)this.capacity();
    }

    @Override
    public Object getState() {
        long rp = this.readPos();
        long wp = this.writePos();
        this.state.put("capacity", this.capacity());
        this.state.put("population", wp - rp);
        this.state.put("write_pos", wp);
        this.state.put("read_pos", rp);
        return this.state;
    }

    public static class ObjectEventFactory
    implements EventFactory<MutableObject> {
        public MutableObject newInstance() {
            return new MutableObject();
        }
    }
}

