package org.apache.iotdb.db.pipe.extractor.realtime.assigner;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.apache.iotdb.commons.concurrent.IoTDBDaemonThreadFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.metric.PipeEventCounter;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.class */
public class DisruptorQueue {
    private static final IoTDBDaemonThreadFactory THREAD_FACTORY = new IoTDBDaemonThreadFactory(ThreadName.PIPE_EXTRACTOR_DISRUPTOR.getName());
    private final PipeMemoryBlock allocatedMemoryBlock;
    private final Disruptor<EventContainer> disruptor;
    private final RingBuffer<EventContainer> ringBuffer;
    private final PipeEventCounter eventCounter = new PipeEventCounter();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue$EventContainer.class */
    public static class EventContainer {
        private PipeRealtimeEvent event;

        private EventContainer() {
        }

        public PipeRealtimeEvent getEvent() {
            return this.event;
        }

        public void setEvent(PipeRealtimeEvent pipeRealtimeEvent) {
            this.event = pipeRealtimeEvent;
        }
    }

    public DisruptorQueue(EventHandler<PipeRealtimeEvent> eventHandler) {
        PipeConfig pipeConfig = PipeConfig.getInstance();
        int pipeExtractorAssignerDisruptorRingBufferSize = pipeConfig.getPipeExtractorAssignerDisruptorRingBufferSize();
        long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = pipeConfig.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
        this.allocatedMemoryBlock = PipeResourceManager.memory().tryAllocate(pipeExtractorAssignerDisruptorRingBufferSize * pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes, j -> {
            return j / 2;
        });
        this.disruptor = new Disruptor<>(() -> {
            return new EventContainer();
        }, Math.max(32, Math.toIntExact(this.allocatedMemoryBlock.getMemoryUsageInBytes() / pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes)), THREAD_FACTORY, ProducerType.MULTI, new BlockingWaitStrategy());
        this.disruptor.handleEventsWith(new EventHandler[]{(eventContainer, j2, z) -> {
            eventHandler.onEvent(eventContainer.getEvent(), j2, z);
            this.eventCounter.decreaseEventCount(eventContainer.getEvent().getEvent());
        }});
        this.disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
        this.ringBuffer = this.disruptor.start();
    }

    public void publish(PipeRealtimeEvent pipeRealtimeEvent) {
        EnrichedEvent event = pipeRealtimeEvent.getEvent();
        if (event instanceof PipeHeartbeatEvent) {
            ((PipeHeartbeatEvent) event).recordDisruptorSize(this.ringBuffer);
        }
        this.ringBuffer.publishEvent((eventContainer, j, pipeRealtimeEvent2) -> {
            eventContainer.setEvent(pipeRealtimeEvent);
        }, pipeRealtimeEvent);
        this.eventCounter.increaseEventCount(event);
    }

    public void clear() {
        this.disruptor.halt();
        this.allocatedMemoryBlock.close();
    }

    public int getTabletInsertionEventCount() {
        return this.eventCounter.getTabletInsertionEventCount().intValue();
    }

    public int getTsFileInsertionEventCount() {
        return this.eventCounter.getTsFileInsertionEventCount().intValue();
    }

    public int getPipeHeartbeatEventCount() {
        return this.eventCounter.getPipeHeartbeatEventCount().intValue();
    }
}
