package backtype.storm.utils;

import backtype.storm.utils.disruptor.AbstractSequencerExt;
import backtype.storm.utils.disruptor.RingBuffer;
import com.alibaba.jstorm.callback.Callback;
import com.alibaba.jstorm.daemon.worker.Flusher;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.com.lmax.disruptor.AlertException;
import shade.storm.com.lmax.disruptor.EventFactory;
import shade.storm.com.lmax.disruptor.EventHandler;
import shade.storm.com.lmax.disruptor.InsufficientCapacityException;
import shade.storm.com.lmax.disruptor.Sequence;
import shade.storm.com.lmax.disruptor.SequenceBarrier;
import shade.storm.com.lmax.disruptor.TimeoutException;
import shade.storm.com.lmax.disruptor.WaitStrategy;
import shade.storm.com.lmax.disruptor.dsl.ProducerType;

/* loaded from: input_file:backtype/storm/utils/DisruptorQueueImpl.class */
public class DisruptorQueueImpl extends DisruptorQueue {
    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueImpl.class);
    static boolean useSleep = true;
    private static final Object INTERRUPT = new Object();
    private static final String PREFIX = "disruptor-";
    private final String _queueName;
    private final RingBuffer<MutableObject> _buffer;
    private final SequenceBarrier _barrier;
    private boolean _isBatch;
    private ThreadLocalBatch _batcher;
    private int _inputBatchSize;
    private Flusher _flusher;
    private List<Object> _cache;
    private Object _callbackLock = new Object();
    private List<Callback> _callbacks = null;
    private Object _cacheLock = new Object();
    private final HashMap<String, Object> state = new HashMap<>(4);
    private final Sequence _consumer = new Sequence();

    /* loaded from: input_file:backtype/storm/utils/DisruptorQueueImpl$DisruptorFlusher.class */
    private class DisruptorFlusher extends Flusher {
        private AtomicBoolean _isFlushing = new AtomicBoolean(false);

        public DisruptorFlusher(long j) {
            this._flushIntervalMs = j;
        }

        @Override // com.alibaba.jstorm.daemon.worker.Flusher, java.lang.Runnable
        public void run() {
            if (this._isFlushing.compareAndSet(false, true)) {
                DisruptorQueueImpl.this._batcher.flush();
                this._isFlushing.set(false);
            }
        }
    }

    /* loaded from: input_file:backtype/storm/utils/DisruptorQueueImpl$ObjectEventFactory.class */
    public static class ObjectEventFactory implements EventFactory<MutableObject> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // shade.storm.com.lmax.disruptor.EventFactory
        public MutableObject newInstance() {
            return new MutableObject();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:backtype/storm/utils/DisruptorQueueImpl$ThreadLocalBatch.class */
    public class ThreadLocalBatch {
        private ArrayList<Object> _batcher;

        public ThreadLocalBatch() {
            this._batcher = new ArrayList<>(DisruptorQueueImpl.this._inputBatchSize);
        }

        public synchronized void addAndFlush(Object obj) {
            ArrayList<Object> add = add(obj);
            if (add != null) {
                try {
                    DisruptorQueueImpl.this.publishDirect((List<Object>) add, true);
                } catch (InsufficientCapacityException e) {
                    DisruptorQueueImpl.LOG.warn("Failed to publish batch");
                }
            }
        }

        public ArrayList<Object> add(Object obj) {
            ArrayList<Object> arrayList = null;
            this._batcher.add(obj);
            if (this._batcher.size() >= DisruptorQueueImpl.this._inputBatchSize) {
                arrayList = this._batcher;
                this._batcher = new ArrayList<>(DisruptorQueueImpl.this._inputBatchSize);
            }
            return arrayList;
        }

        public synchronized void flush() {
            try {
                if (this._batcher != null && this._batcher.size() > 0) {
                    DisruptorQueueImpl.this.publishDirect((List<Object>) this._batcher, true);
                    this._batcher = new ArrayList<>(DisruptorQueueImpl.this._inputBatchSize);
                }
            } catch (InsufficientCapacityException e) {
            }
        }
    }

    public static void setUseSleep(boolean z) {
        AbstractSequencerExt.setWaitSleep(z);
    }

    public DisruptorQueueImpl(String str, ProducerType producerType, int i, WaitStrategy waitStrategy, boolean z, int i2, long j) {
        this._queueName = PREFIX + str;
        this._buffer = RingBuffer.create(producerType, new ObjectEventFactory(), i, waitStrategy);
        this._barrier = this._buffer.newBarrier(new Sequence[0]);
        this._buffer.addGatingSequences(this._consumer);
        this._isBatch = z;
        this._cache = new ArrayList();
        if (!this._isBatch) {
            this._batcher = null;
            return;
        }
        this._inputBatchSize = i2;
        this._batcher = new ThreadLocalBatch();
        this._flusher = new DisruptorFlusher(Math.max(j, 1L));
        this._flusher.start();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public String getName() {
        return this._queueName;
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void consumeBatch(EventHandler<Object> eventHandler) {
        if (this._buffer.getCursor() > this._consumer.get()) {
            consumeBatchWhenAvailable(eventHandler);
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void haltWithInterrupt() {
        publish(INTERRUPT);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public Object poll() {
        Object obj = null;
        if (cacheSize() > 0) {
            synchronized (this._cacheLock) {
                obj = this._cache.remove(0);
            }
        } else {
            long j = this._consumer.get() + 1;
            if (j <= this._barrier.getCursor()) {
                MutableObject mutableObject = this._buffer.get(j);
                this._consumer.set(j);
                obj = mutableObject.o;
                mutableObject.setObject(null);
            }
        }
        handlerCallback();
        return obj;
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public Object take() {
        Object obj;
        if (cacheSize() > 0) {
            synchronized (this._cacheLock) {
                obj = this._cache.remove(0);
            }
        } else {
            long j = this._consumer.get() + 1;
            try {
                this._barrier.waitFor(j);
                MutableObject mutableObject = this._buffer.get(j);
                this._consumer.set(j);
                obj = mutableObject.o;
                mutableObject.setObject(null);
            } catch (InterruptedException e) {
                LOG.error("InterruptedException " + e.getCause());
                return null;
            } catch (AlertException e2) {
                LOG.error(e2.getMessage(), e2);
                throw new RuntimeException(e2);
            } catch (TimeoutException e3) {
                return null;
            }
        }
        handlerCallback();
        return obj;
    }

    private void handlerCallback() {
        if (this._callbacks != null) {
            synchronized (this._callbackLock) {
                if (this._callbacks != null) {
                    Iterator<Callback> it = this._callbacks.iterator();
                    while (it.hasNext()) {
                        if (((Boolean) it.next().execute(new Object[0])).booleanValue()) {
                            it.remove();
                        }
                    }
                    if (this._callbacks.size() == 0) {
                        this._callbacks = null;
                    }
                }
            }
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void consumeBatchWhenAvailable(EventHandler<Object> eventHandler) {
        consumeBatchWhenAvailable(eventHandler, true);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void consumeBatchWhenAvailableWithCallback(EventHandler<Object> eventHandler) {
        consumeBatchWhenAvailable(eventHandler);
        handlerCallback();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void multiConsumeBatchWhenAvailable(EventHandler<Object> eventHandler) {
        consumeBatchWhenAvailable(eventHandler, false);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void multiConsumeBatchWhenAvailableWithCallback(EventHandler<Object> eventHandler) {
        consumeBatchWhenAvailable(eventHandler, false);
        handlerCallback();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void consumeBatchWhenAvailable(EventHandler<Object> eventHandler, boolean z) {
        List<Object> list = null;
        synchronized (this._cacheLock) {
            if (this._cache.size() > 0) {
                list = this._cache;
                this._cache = new ArrayList();
            }
        }
        if (list != null) {
            int i = 0;
            while (i < list.size()) {
                try {
                    eventHandler.onEvent(list.get(i), 0L, i == list.size() - 1);
                    i++;
                } catch (Exception e) {
                    LOG.error(e.getMessage(), e);
                    throw new RuntimeException(e);
                }
            }
            return;
        }
        try {
            if (z) {
                long j = this._consumer.get() + 1;
                long waitFor = this._barrier.waitFor(j);
                if (waitFor >= j) {
                    consumeBatchToCursor(waitFor, eventHandler);
                }
            } else {
                List<Object> retreiveAvailableBatch = retreiveAvailableBatch();
                if (retreiveAvailableBatch.size() > 0) {
                    int i2 = 0;
                    while (i2 < retreiveAvailableBatch.size()) {
                        try {
                            eventHandler.onEvent(retreiveAvailableBatch.get(i2), 0L, i2 == retreiveAvailableBatch.size() - 1);
                            i2++;
                        } catch (Exception e2) {
                            LOG.error(e2.getMessage(), e2);
                            throw new RuntimeException(e2);
                        }
                    }
                } else {
                    JStormUtils.sleepMs(1L);
                }
            }
        } catch (InterruptedException e3) {
            LOG.error("InterruptedException " + e3.getCause());
        } catch (AlertException e4) {
            LOG.error(e4.getMessage(), e4);
            throw new RuntimeException(e4);
        } catch (TimeoutException e5) {
        }
    }

    public void consumeBatchToCursor(long j, EventHandler<Object> eventHandler) {
        long j2 = this._consumer.get();
        while (true) {
            long j3 = j2 + 1;
            if (j3 > j) {
                this._consumer.set(j);
                return;
            }
            try {
                MutableObject mutableObject = this._buffer.get(j3);
                Object obj = mutableObject.o;
                mutableObject.setObject(null);
                eventHandler.onEvent(obj, j3, j3 == j);
                j2 = j3;
            } catch (InterruptedException e) {
                LOG.error(e.getMessage(), e);
                return;
            } catch (Exception e2) {
                LOG.error(e2.getMessage(), e2);
                throw new RuntimeException(e2);
            }
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public synchronized List<Object> retreiveAvailableBatch() throws AlertException, InterruptedException, TimeoutException {
        long j = this._consumer.get() + 1;
        long waitFor = this._barrier.waitFor(j);
        ArrayList arrayList = new ArrayList(Long.valueOf((waitFor - j) + 1).intValue());
        if (waitFor >= j) {
            long j2 = this._consumer.get();
            while (true) {
                long j3 = j2 + 1;
                if (j3 > waitFor) {
                    break;
                }
                try {
                    MutableObject mutableObject = this._buffer.get(j3);
                    Object obj = mutableObject.o;
                    mutableObject.setObject(null);
                    arrayList.add(obj);
                    j2 = j3;
                } catch (Exception e) {
                    LOG.error(e.getMessage(), e);
                    throw new RuntimeException(e);
                }
            }
            this._consumer.set(waitFor);
        }
        return arrayList;
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void publish(Object obj) {
        try {
            if (this._isBatch) {
                publishBatch(obj);
            } else {
                publish(obj, true);
            }
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!");
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void publishBatch(Object obj) {
        this._batcher.addAndFlush(obj);
    }

    public void publishDirect(List<Object> list, boolean z) throws InsufficientCapacityException {
        if (list != null) {
            int size = list.size();
            if (!z) {
                long tryNext = this._buffer.tryNext(size);
                publishBuffer(list, 0, (tryNext - size) + 1, tryNext);
                return;
            }
            int i = size;
            int i2 = 0;
            while (i > 0) {
                int remainingCapacity = (int) this._buffer.remainingCapacity();
                if (remainingCapacity > 0) {
                    int i3 = remainingCapacity >= i ? i : remainingCapacity;
                    long next = this._buffer.next(i3);
                    publishBuffer(list, i2, (next - i3) + 1, next);
                    i -= i3;
                    i2 += i3;
                } else {
                    JStormUtils.sleepMs(1L);
                }
            }
        }
    }

    private void publishBuffer(List<Object> list, int i, long j, long j2) {
        long j3 = j;
        while (j3 <= j2) {
            this._buffer.get(j3).setObject(list.get(i));
            j3++;
            i++;
        }
        this._buffer.publish(j, j2);
    }

    public void tryPublish(Object obj) throws InsufficientCapacityException {
        publish(obj, false);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void publish(Object obj, boolean z) throws InsufficientCapacityException {
        publishDirect(obj, z);
    }

    protected void publishDirect(Object obj, boolean z) throws InsufficientCapacityException {
        long next = z ? this._buffer.next() : this._buffer.tryNext(1);
        this._buffer.get(next).setObject(obj);
        this._buffer.publish(next);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void publishCache(Object obj) {
        synchronized (this._cacheLock) {
            this._cache.add(obj);
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public int cacheSize() {
        int size;
        synchronized (this._cacheLock) {
            size = this._cache.size();
        }
        return size;
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void clear() {
        while (population() != 0) {
            poll();
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long population() {
        return writePos() - readPos();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long capacity() {
        return this._buffer.getBufferSize();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long writePos() {
        return this._buffer.getCursor();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long readPos() {
        return this._consumer.get();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public float pctFull() {
        return (1.0f * ((float) population())) / ((float) capacity());
    }

    @Override // backtype.storm.metric.api.IStatefulObject
    public Object getState() {
        long readPos = readPos();
        long writePos = writePos();
        this.state.put("capacity", Long.valueOf(capacity()));
        this.state.put("population", Long.valueOf(writePos - readPos));
        this.state.put("write_pos", Long.valueOf(writePos));
        this.state.put("read_pos", Long.valueOf(readPos));
        return this.state;
    }

    public RingBuffer<MutableObject> get_buffer() {
        return this._buffer;
    }

    public SequenceBarrier get_barrier() {
        return this._barrier;
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void publishCallback(Callback callback) {
        if (callback != null) {
            synchronized (this._callbackLock) {
                if (this._callbacks == null) {
                    this._callbacks = new ArrayList();
                }
                this._callbacks.add(callback);
            }
        }
    }
}
