package org.openmetadata.service.events;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.openmetadata.schema.type.ChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/openmetadata/service/events/EventPubSub.class */
public class EventPubSub {
    private static Disruptor<ChangeEventHolder> disruptor;
    private static ExecutorService executor;
    private static RingBuffer<ChangeEventHolder> ringBuffer;
    private static final Logger LOG = LoggerFactory.getLogger(EventPubSub.class);
    private static boolean started = false;

    /* loaded from: input_file:org/openmetadata/service/events/EventPubSub$ChangeEventFactory.class */
    public static class ChangeEventFactory implements EventFactory<ChangeEventHolder> {
        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public ChangeEventHolder m35newInstance() {
            return new ChangeEventHolder();
        }
    }

    /* loaded from: input_file:org/openmetadata/service/events/EventPubSub$ChangeEventHolder.class */
    public static class ChangeEventHolder {
        private ChangeEvent value;

        public void set(ChangeEvent changeEvent) {
            this.value = changeEvent;
        }

        public ChangeEvent get() {
            return this.value;
        }
    }

    /* loaded from: input_file:org/openmetadata/service/events/EventPubSub$DefaultExceptionHandler.class */
    public static class DefaultExceptionHandler implements ExceptionHandler<ChangeEventHolder> {
        public void handleEventException(Throwable th, long j, ChangeEventHolder changeEventHolder) {
            EventPubSub.LOG.warn("Disruptor error in onEvent {}", th.getMessage());
            throw new RuntimeException(th.getMessage());
        }

        public void handleOnStartException(Throwable th) {
            EventPubSub.LOG.warn("Disruptor error in onStart {}", th.getMessage());
        }

        public void handleOnShutdownException(Throwable th) {
            EventPubSub.LOG.warn("Disruptor error on onShutdown {}", th.getMessage());
        }
    }

    public static void start() {
        if (started) {
            return;
        }
        disruptor = new Disruptor<>(ChangeEventHolder::new, 1024, DaemonThreadFactory.INSTANCE);
        disruptor.setDefaultExceptionHandler(new DefaultExceptionHandler());
        executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
        ringBuffer = disruptor.start();
        LOG.info("Disruptor started");
        started = true;
    }

    public static void shutdown() throws InterruptedException {
        if (started) {
            disruptor.shutdown();
            disruptor.halt();
            executor.shutdownNow();
            executor.awaitTermination(10L, TimeUnit.SECONDS);
            disruptor = null;
            ringBuffer = null;
            started = false;
            LOG.info("Disruptor stopped");
        }
    }

    public static void publish(ChangeEvent changeEvent) {
        if (changeEvent != null) {
            RingBuffer ringBuffer2 = disruptor.getRingBuffer();
            long next = ringBuffer2.next();
            ((ChangeEventHolder) ringBuffer2.get(next)).set(changeEvent);
            ringBuffer2.publish(next);
        }
    }

    public static BatchEventProcessor<ChangeEventHolder> addEventHandler(com.lmax.disruptor.EventHandler<ChangeEventHolder> eventHandler) {
        Runnable batchEventProcessor = new BatchEventProcessor(ringBuffer, ringBuffer.newBarrier(new Sequence[0]), eventHandler);
        batchEventProcessor.setExceptionHandler(new DefaultExceptionHandler());
        ringBuffer.addGatingSequences(new Sequence[]{batchEventProcessor.getSequence()});
        executor.execute(batchEventProcessor);
        LOG.info("Processor added for {}", batchEventProcessor);
        return batchEventProcessor;
    }

    public static void removeProcessor(BatchEventProcessor<ChangeEventHolder> batchEventProcessor) {
        ringBuffer.removeGatingSequence(batchEventProcessor.getSequence());
        LOG.info("Processor removed for {}", batchEventProcessor);
    }

    public void close() {
    }
}
