/*
 * Decompiled with CFR 0.152.
 */
package backtype.storm.utils;

import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.MutableObject;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.HashMap;
import java.util.concurrent.LinkedBlockingDeque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisruptorWrapBlockingQueue
extends DisruptorQueue {
    private static final Logger LOG = LoggerFactory.getLogger(DisruptorWrapBlockingQueue.class);
    private static final long QUEUE_CAPACITY = 512L;
    private LinkedBlockingDeque<Object> queue;
    private String queueName;

    public DisruptorWrapBlockingQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
        this.queueName = queueName;
        this.queue = new LinkedBlockingDeque();
    }

    @Override
    public String getName() {
        return this.queueName;
    }

    @Override
    public void consumeBatch(EventHandler<Object> handler) {
        this.consumeBatchToCursor(0L, handler);
    }

    @Override
    public void haltWithInterrupt() {
    }

    @Override
    public Object poll() {
        return this.queue.poll();
    }

    @Override
    public Object take() {
        try {
            return this.queue.take();
        }
        catch (InterruptedException e) {
            return null;
        }
    }

    public void drainQueue(Object object, EventHandler<Object> handler) {
        while (object != null) {
            try {
                handler.onEvent(object, 0L, false);
                object = this.queue.poll();
            }
            catch (InterruptedException e) {
                LOG.warn("Occur interrupt error, " + object);
                break;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
        Object object = this.queue.poll();
        if (object == null) {
            try {
                object = this.queue.take();
            }
            catch (InterruptedException e) {
                LOG.warn("Occur interrupt error, " + object);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        this.drainQueue(object, handler);
    }

    public void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
        Object object = this.queue.poll();
        this.drainQueue(object, handler);
    }

    @Override
    public void publish(Object obj) {
        boolean isSuccess = this.queue.offer(obj);
        while (!isSuccess) {
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            isSuccess = this.queue.offer(obj);
        }
    }

    public void tryPublish(Object obj) throws InsufficientCapacityException {
        boolean isSuccess = this.queue.offer(obj);
        if (!isSuccess) {
            throw InsufficientCapacityException.INSTANCE;
        }
    }

    @Override
    public void publish(Object obj, boolean block) throws InsufficientCapacityException {
        if (block) {
            this.publish(obj);
        } else {
            this.tryPublish(obj);
        }
    }

    public void consumerStarted() {
    }

    private void flushCache() {
    }

    @Override
    public void clear() {
        this.queue.clear();
    }

    @Override
    public long population() {
        return this.queue.size();
    }

    @Override
    public long capacity() {
        long used = this.queue.size();
        if (used < 512L) {
            return 512L;
        }
        return used;
    }

    @Override
    public long writePos() {
        return 0L;
    }

    @Override
    public long readPos() {
        return this.queue.size();
    }

    @Override
    public float pctFull() {
        long used = this.queue.size();
        if (used < 512L) {
            return 1.0f * (float)used / 512.0f;
        }
        return 1.0f;
    }

    @Override
    public Object getState() {
        HashMap<String, Long> state = new HashMap<String, Long>();
        long rp = this.readPos();
        long wp = this.writePos();
        state.put("capacity", this.capacity());
        state.put("population", wp - rp);
        state.put("write_pos", wp);
        state.put("read_pos", rp);
        return state;
    }

    public static class ObjectEventFactory
    implements EventFactory<MutableObject> {
        public MutableObject newInstance() {
            return new MutableObject();
        }
    }
}

