package org.apache.iotdb.db.pipe.extractor.realtime.assigner;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import org.apache.iotdb.commons.pipe.config.PipeConfig;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.class */
public class DisruptorQueue<E> {
    private Disruptor<Container<E>> disruptor;
    private RingBuffer<Container<E>> ringBuffer;

    /* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue$Builder.class */
    public static class Builder<E> {
        private int ringBufferSize = PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize();
        private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
        private ProducerType producerType = ProducerType.MULTI;
        private WaitStrategy waitStrategy = new BlockingWaitStrategy();
        private final List<EventHandler<E>> handlers = new ArrayList();

        public Builder<E> setProducerType(ProducerType producerType) {
            this.producerType = producerType;
            return this;
        }

        public Builder<E> addEventHandler(EventHandler<E> eventHandler) {
            this.handlers.add(eventHandler);
            return this;
        }

        public DisruptorQueue<E> build() {
            DisruptorQueue<E> disruptorQueue = new DisruptorQueue<>();
            ((DisruptorQueue) disruptorQueue).disruptor = new Disruptor(() -> {
                return new Container();
            }, this.ringBufferSize, this.threadFactory, this.producerType, this.waitStrategy);
            for (EventHandler<E> eventHandler : this.handlers) {
                ((DisruptorQueue) disruptorQueue).disruptor.handleEventsWith(new EventHandler[]{(container, j, z) -> {
                    eventHandler.onEvent(container.getObj(), j, z);
                }});
            }
            ((DisruptorQueue) disruptorQueue).disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
            ((DisruptorQueue) disruptorQueue).disruptor.start();
            ((DisruptorQueue) disruptorQueue).ringBuffer = ((DisruptorQueue) disruptorQueue).disruptor.getRingBuffer();
            return disruptorQueue;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue$Container.class */
    public static class Container<E> {
        private E obj;

        private Container() {
        }

        public E getObj() {
            return this.obj;
        }

        public void setObj(E e) {
            this.obj = e;
        }
    }

    private DisruptorQueue() {
    }

    public void publish(E e) {
        this.ringBuffer.publishEvent((container, j, obj) -> {
            container.setObj(obj);
        }, e);
    }

    public void clear() {
        this.disruptor.halt();
    }
}
