/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.events;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.openmetadata.schema.type.ChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventPubSub {
    private static final Logger LOG = LoggerFactory.getLogger(EventPubSub.class);
    private static Disruptor<ChangeEventHolder> disruptor;
    private static ExecutorService executor;
    private static RingBuffer<ChangeEventHolder> ringBuffer;
    private static boolean started;

    public static void start() {
        if (!started) {
            disruptor = new Disruptor(ChangeEventHolder::new, 1024, (ThreadFactory)DaemonThreadFactory.INSTANCE);
            disruptor.setDefaultExceptionHandler((ExceptionHandler)new DefaultExceptionHandler());
            executor = Executors.newCachedThreadPool((ThreadFactory)DaemonThreadFactory.INSTANCE);
            ringBuffer = disruptor.start();
            LOG.info("Disruptor started");
            started = true;
        }
    }

    public static void shutdown() throws InterruptedException {
        if (started) {
            disruptor.shutdown();
            disruptor.halt();
            executor.shutdownNow();
            executor.awaitTermination(10L, TimeUnit.SECONDS);
            disruptor = null;
            ringBuffer = null;
            started = false;
            LOG.info("Disruptor stopped");
        }
    }

    public static void publish(ChangeEvent event) {
        if (event != null) {
            RingBuffer ringBuffer = disruptor.getRingBuffer();
            long sequence = ringBuffer.next();
            ((ChangeEventHolder)ringBuffer.get(sequence)).setEvent(event);
            ringBuffer.publish(sequence);
        }
    }

    public static BatchEventProcessor<ChangeEventHolder> addEventHandler(EventHandler<ChangeEventHolder> eventHandler) {
        BatchEventProcessor processor = new BatchEventProcessor(ringBuffer, ringBuffer.newBarrier(new Sequence[0]), eventHandler);
        processor.setExceptionHandler((ExceptionHandler)new DefaultExceptionHandler());
        ringBuffer.addGatingSequences(new Sequence[]{processor.getSequence()});
        executor.execute((Runnable)processor);
        LOG.info("Processor added for {}", (Object)processor);
        return processor;
    }

    public static void removeProcessor(BatchEventProcessor<ChangeEventHolder> processor) {
        ringBuffer.removeGatingSequence(processor.getSequence());
        LOG.info("Processor removed for {}", processor);
    }

    public void close() {
    }

    static {
        started = false;
    }

    public static class DefaultExceptionHandler
    implements ExceptionHandler<ChangeEventHolder> {
        public void handleEventException(Throwable throwable, long l, ChangeEventHolder changeEventHolder) {
            LOG.warn("Disruptor error in onEvent {}", (Object)throwable.getMessage());
            throw new RuntimeException(throwable.getMessage());
        }

        public void handleOnStartException(Throwable throwable) {
            LOG.warn("Disruptor error in onStart {}", (Object)throwable.getMessage());
        }

        public void handleOnShutdownException(Throwable throwable) {
            LOG.warn("Disruptor error on onShutdown {}", (Object)throwable.getMessage());
        }
    }

    public static class ChangeEventFactory
    implements EventFactory<ChangeEventHolder> {
        public ChangeEventHolder newInstance() {
            return new ChangeEventHolder();
        }
    }

    public static class ChangeEventHolder {
        private ChangeEvent event;

        public ChangeEvent getEvent() {
            return this.event;
        }

        public void setEvent(ChangeEvent event) {
            this.event = event;
        }
    }
}

