package backtype.storm.utils;

import backtype.storm.utils.disruptor.AbstractSequencerExt;
import backtype.storm.utils.disruptor.RingBuffer;
import com.lmax.disruptor.AlertException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:backtype/storm/utils/DisruptorQueueImpl.class */
public class DisruptorQueueImpl extends DisruptorQueue {
    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueImpl.class);
    static boolean useSleep = true;
    private static final Object FLUSH_CACHE = new Object();
    private static final Object INTERRUPT = new Object();
    private static final String PREFIX = "disruptor-";
    private final String _queueName;
    private final RingBuffer<MutableObject> _buffer;
    private final SequenceBarrier _barrier;
    private final HashMap<String, Object> state = new HashMap<>(4);
    private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<>();
    private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.cacheLock.readLock();
    private final Lock writeLock = this.cacheLock.writeLock();
    private final Sequence _consumer = new Sequence();

    /* loaded from: input_file:backtype/storm/utils/DisruptorQueueImpl$ObjectEventFactory.class */
    public static class ObjectEventFactory implements EventFactory<MutableObject> {
        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public MutableObject m1028newInstance() {
            return new MutableObject();
        }
    }

    public static void setUseSleep(boolean z) {
        AbstractSequencerExt.setWaitSleep(z);
    }

    public DisruptorQueueImpl(String str, ProducerType producerType, int i, WaitStrategy waitStrategy) {
        this._queueName = PREFIX + str;
        this._buffer = RingBuffer.create(producerType, new ObjectEventFactory(), i, waitStrategy);
        this._barrier = this._buffer.newBarrier(new Sequence[0]);
        this._buffer.addGatingSequences(this._consumer);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public String getName() {
        return this._queueName;
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void consumeBatch(EventHandler<Object> eventHandler) {
        consumeBatchToCursor(this._barrier.getCursor(), eventHandler);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void haltWithInterrupt() {
        publish(INTERRUPT);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public Object poll() {
        long j = this._consumer.get() + 1;
        if (j > this._barrier.getCursor()) {
            return null;
        }
        MutableObject mutableObject = this._buffer.get(j);
        this._consumer.set(j);
        Object obj = mutableObject.o;
        mutableObject.setObject(null);
        return obj;
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public Object take() {
        long j = this._consumer.get() + 1;
        try {
            this._barrier.waitFor(j);
            MutableObject mutableObject = this._buffer.get(j);
            this._consumer.set(j);
            Object obj = mutableObject.o;
            mutableObject.setObject(null);
            return obj;
        } catch (TimeoutException e) {
            return null;
        } catch (AlertException e2) {
            LOG.error(e2.getMessage(), e2);
            throw new RuntimeException((Throwable) e2);
        } catch (InterruptedException e3) {
            LOG.error("InterruptedException " + e3.getCause());
            return null;
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void consumeBatchWhenAvailable(EventHandler<Object> eventHandler) {
        try {
            long j = this._consumer.get() + 1;
            long waitFor = this._barrier.waitFor(j);
            if (waitFor >= j) {
                consumeBatchToCursor(waitFor, eventHandler);
            }
        } catch (AlertException e) {
            LOG.error(e.getMessage(), e);
            throw new RuntimeException((Throwable) e);
        } catch (InterruptedException e2) {
            LOG.error("InterruptedException " + e2.getCause());
        } catch (TimeoutException e3) {
        }
    }

    public void consumeBatchToCursor(long j, EventHandler<Object> eventHandler) {
        long j2 = this._consumer.get();
        while (true) {
            long j3 = j2 + 1;
            if (j3 > j) {
                this._consumer.set(j);
                return;
            }
            try {
                MutableObject mutableObject = this._buffer.get(j3);
                Object obj = mutableObject.o;
                mutableObject.setObject(null);
                eventHandler.onEvent(obj, j3, j3 == j);
                j2 = j3;
            } catch (InterruptedException e) {
                LOG.error(e.getMessage(), e);
                return;
            } catch (Exception e2) {
                LOG.error(e2.getMessage(), e2);
                throw new RuntimeException(e2);
            }
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void publish(Object obj) {
        try {
            publish(obj, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!");
        }
    }

    public void tryPublish(Object obj) throws InsufficientCapacityException {
        publish(obj, false);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void publish(Object obj, boolean z) throws InsufficientCapacityException {
        publishDirect(obj, z);
    }

    protected void publishDirect(Object obj, boolean z) throws InsufficientCapacityException {
        long next = z ? this._buffer.next() : this._buffer.tryNext(1);
        this._buffer.get(next).setObject(obj);
        this._buffer.publish(next);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void clear() {
        while (population() != 0) {
            poll();
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long population() {
        return writePos() - readPos();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long capacity() {
        return this._buffer.getBufferSize();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long writePos() {
        return this._buffer.getCursor();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long readPos() {
        return this._consumer.get();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public float pctFull() {
        return (1.0f * ((float) population())) / ((float) capacity());
    }

    @Override // backtype.storm.metric.api.IStatefulObject
    public Object getState() {
        long readPos = readPos();
        long writePos = writePos();
        this.state.put("capacity", Long.valueOf(capacity()));
        this.state.put("population", Long.valueOf(writePos - readPos));
        this.state.put("write_pos", Long.valueOf(writePos));
        this.state.put("read_pos", Long.valueOf(readPos));
        return this.state;
    }
}
