package exchange.core2.core;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.EventProcessorFactory;
import com.lmax.disruptor.dsl.ProducerType;
import exchange.core2.core.common.CoreWaitStrategy;
import exchange.core2.core.common.cmd.CommandResultCode;
import exchange.core2.core.common.cmd.OrderCommand;
import exchange.core2.core.common.cmd.OrderCommandType;
import exchange.core2.core.common.config.ExchangeConfiguration;
import exchange.core2.core.common.config.PerformanceConfiguration;
import exchange.core2.core.common.config.SerializationConfiguration;
import exchange.core2.core.orderbook.IOrderBook;
import exchange.core2.core.processors.DisruptorExceptionHandler;
import exchange.core2.core.processors.GroupingProcessor;
import exchange.core2.core.processors.MatchingEngineRouter;
import exchange.core2.core.processors.ResultsHandler;
import exchange.core2.core.processors.RiskEngine;
import exchange.core2.core.processors.SharedPool;
import exchange.core2.core.processors.TwoStepMasterProcessor;
import exchange.core2.core.processors.TwoStepSlaveProcessor;
import exchange.core2.core.processors.journaling.ISerializationProcessor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.ObjLongConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:exchange/core2/core/ExchangeCore.class */
public final class ExchangeCore {
    private final Disruptor<OrderCommand> disruptor;
    private final ExchangeApi api;
    private final ISerializationProcessor serializationProcessor;
    private final ExchangeConfiguration exchangeConfiguration;
    private boolean started = false;
    private boolean stopped = false;
    public static final boolean EVENTS_POOLING = false;
    private static final Logger log = LoggerFactory.getLogger(ExchangeCore.class);
    private static final EventTranslator<OrderCommand> SHUTDOWN_SIGNAL_TRANSLATOR = (orderCommand, j) -> {
        orderCommand.command = OrderCommandType.SHUTDOWN_SIGNAL;
        orderCommand.resultCode = CommandResultCode.NEW;
    };

    /* loaded from: input_file:exchange/core2/core/ExchangeCore$ExchangeCoreBuilder.class */
    public static class ExchangeCoreBuilder {
        private ObjLongConsumer<OrderCommand> resultsConsumer;
        private ExchangeConfiguration exchangeConfiguration;

        ExchangeCoreBuilder() {
        }

        public ExchangeCoreBuilder resultsConsumer(ObjLongConsumer<OrderCommand> objLongConsumer) {
            this.resultsConsumer = objLongConsumer;
            return this;
        }

        public ExchangeCoreBuilder exchangeConfiguration(ExchangeConfiguration exchangeConfiguration) {
            this.exchangeConfiguration = exchangeConfiguration;
            return this;
        }

        public ExchangeCore build() {
            return new ExchangeCore(this.resultsConsumer, this.exchangeConfiguration);
        }

        public String toString() {
            return "ExchangeCore.ExchangeCoreBuilder(resultsConsumer=" + this.resultsConsumer + ", exchangeConfiguration=" + this.exchangeConfiguration + ")";
        }
    }

    public ExchangeCore(ObjLongConsumer<OrderCommand> objLongConsumer, ExchangeConfiguration exchangeConfiguration) {
        EventHandler eventHandler;
        log.debug("Building exchange core from configuration: {}", exchangeConfiguration);
        this.exchangeConfiguration = exchangeConfiguration;
        PerformanceConfiguration performanceCfg = exchangeConfiguration.getPerformanceCfg();
        this.disruptor = new Disruptor<>(OrderCommand::new, performanceCfg.getRingBufferSize(), performanceCfg.getThreadFactory(), ProducerType.MULTI, performanceCfg.getWaitStrategy().getDisruptorWaitStrategy());
        this.api = new ExchangeApi(this.disruptor.getRingBuffer(), performanceCfg.getBinaryCommandsLz4CompressorFactory().get());
        ThreadFactory threadFactory = performanceCfg.getThreadFactory();
        IOrderBook.OrderBookFactory orderBookFactory = performanceCfg.getOrderBookFactory();
        CoreWaitStrategy waitStrategy = performanceCfg.getWaitStrategy();
        int matchingEnginesNum = performanceCfg.getMatchingEnginesNum();
        int riskEnginesNum = performanceCfg.getRiskEnginesNum();
        SerializationConfiguration serializationCfg = exchangeConfiguration.getSerializationCfg();
        this.serializationProcessor = serializationCfg.getSerializationProcessorFactory().apply(exchangeConfiguration);
        int i = (matchingEnginesNum + riskEnginesNum) * 8;
        SharedPool sharedPool = new SharedPool(i * 4, i, 1);
        DisruptorExceptionHandler disruptorExceptionHandler = new DisruptorExceptionHandler("main", (th, l) -> {
            log.error("Exception thrown on sequence={}", l, th);
            this.disruptor.getRingBuffer().publishEvent(SHUTDOWN_SIGNAL_TRANSLATOR);
            this.disruptor.shutdown();
        });
        this.disruptor.setDefaultExceptionHandler(disruptorExceptionHandler);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(matchingEnginesNum + riskEnginesNum, threadFactory);
        Map map = (Map) IntStream.range(0, matchingEnginesNum).boxed().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return CompletableFuture.supplyAsync(() -> {
                return new MatchingEngineRouter(num2.intValue(), matchingEnginesNum, this.serializationProcessor, orderBookFactory, sharedPool, exchangeConfiguration);
            }, newFixedThreadPool);
        }));
        Map map2 = (Map) IntStream.range(0, riskEnginesNum).boxed().collect(Collectors.toMap(num3 -> {
            return num3;
        }, num4 -> {
            return CompletableFuture.supplyAsync(() -> {
                return new RiskEngine(num4.intValue(), riskEnginesNum, this.serializationProcessor, sharedPool, exchangeConfiguration);
            }, newFixedThreadPool);
        }));
        EventHandler[] eventHandlerArr = (EventHandler[]) map.values().stream().map((v0) -> {
            return v0.join();
        }).map(matchingEngineRouter -> {
            return (orderCommand, j, z) -> {
                matchingEngineRouter.processOrder(j, orderCommand);
            };
        }).toArray(ExchangeCore::newEventHandlersArray);
        Map map3 = (Map) map2.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return (RiskEngine) ((CompletableFuture) entry.getValue()).join();
        }));
        ArrayList arrayList = new ArrayList(riskEnginesNum);
        ArrayList arrayList2 = new ArrayList(riskEnginesNum);
        EventHandlerGroup handleEventsWith = this.disruptor.handleEventsWith(new EventProcessorFactory[]{(ringBuffer, sequenceArr) -> {
            return new GroupingProcessor(ringBuffer, ringBuffer.newBarrier(sequenceArr), performanceCfg, waitStrategy, sharedPool);
        }});
        boolean isEnableJournaling = serializationCfg.isEnableJournaling();
        if (isEnableJournaling) {
            ISerializationProcessor iSerializationProcessor = this.serializationProcessor;
            iSerializationProcessor.getClass();
            eventHandler = iSerializationProcessor::writeToJournal;
        } else {
            eventHandler = null;
        }
        EventHandler eventHandler2 = eventHandler;
        if (isEnableJournaling) {
            handleEventsWith.handleEventsWith(new EventHandler[]{eventHandler2});
        }
        map3.forEach((num5, riskEngine) -> {
            handleEventsWith.handleEventsWith(new EventProcessorFactory[]{(ringBuffer2, sequenceArr2) -> {
                SequenceBarrier newBarrier = ringBuffer2.newBarrier(sequenceArr2);
                riskEngine.getClass();
                TwoStepMasterProcessor twoStepMasterProcessor = new TwoStepMasterProcessor(ringBuffer2, newBarrier, riskEngine::preProcessCommand, disruptorExceptionHandler, waitStrategy, "R1_" + num5);
                arrayList.add(twoStepMasterProcessor);
                return twoStepMasterProcessor;
            }});
        });
        this.disruptor.after((EventProcessor[]) arrayList.toArray(new TwoStepMasterProcessor[0])).handleEventsWith(eventHandlerArr);
        EventHandlerGroup after = this.disruptor.after(eventHandlerArr);
        map3.forEach((num6, riskEngine2) -> {
            after.handleEventsWith(new EventProcessorFactory[]{(ringBuffer2, sequenceArr2) -> {
                SequenceBarrier newBarrier = ringBuffer2.newBarrier(sequenceArr2);
                riskEngine2.getClass();
                TwoStepSlaveProcessor twoStepSlaveProcessor = new TwoStepSlaveProcessor(ringBuffer2, newBarrier, riskEngine2::handlerRiskRelease, disruptorExceptionHandler, "R2_" + num6);
                arrayList2.add(twoStepSlaveProcessor);
                return twoStepSlaveProcessor;
            }});
        });
        EventHandlerGroup after2 = isEnableJournaling ? this.disruptor.after(arraysAddHandler(eventHandlerArr, eventHandler2)) : after;
        ResultsHandler resultsHandler = new ResultsHandler(objLongConsumer);
        after2.handleEventsWith(new EventHandler[]{(orderCommand, j, z) -> {
            resultsHandler.onEvent(orderCommand, j, z);
            this.api.processResult(j, orderCommand);
        }});
        IntStream.range(0, riskEnginesNum).forEach(i2 -> {
            ((TwoStepMasterProcessor) arrayList.get(i2)).setSlaveProcessor((TwoStepSlaveProcessor) arrayList2.get(i2));
        });
        try {
            newFixedThreadPool.shutdown();
            newFixedThreadPool.awaitTermination(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized void startup() {
        if (this.started) {
            return;
        }
        log.debug("Starting disruptor...");
        this.disruptor.start();
        this.started = true;
        this.serializationProcessor.replayJournalFullAndThenEnableJouraling(this.exchangeConfiguration.getInitStateCfg(), this.api);
    }

    public ExchangeApi getApi() {
        return this.api;
    }

    public synchronized void shutdown() {
        shutdown(-1L, TimeUnit.MILLISECONDS);
    }

    public synchronized void shutdown(long j, TimeUnit timeUnit) {
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        try {
            log.info("Shutdown disruptor...");
            this.disruptor.getRingBuffer().publishEvent(SHUTDOWN_SIGNAL_TRANSLATOR);
            this.disruptor.shutdown(j, timeUnit);
            log.info("Disruptor stopped");
        } catch (TimeoutException e) {
            throw new IllegalStateException("could not stop a disruptor gracefully. Not all events may be executed.");
        }
    }

    private static EventHandler<OrderCommand>[] arraysAddHandler(EventHandler<OrderCommand>[] eventHandlerArr, EventHandler<OrderCommand> eventHandler) {
        EventHandler<OrderCommand>[] eventHandlerArr2 = (EventHandler[]) Arrays.copyOf(eventHandlerArr, eventHandlerArr.length + 1);
        eventHandlerArr2[eventHandlerArr.length] = eventHandler;
        return eventHandlerArr2;
    }

    private static EventHandler<OrderCommand>[] newEventHandlersArray(int i) {
        return new EventHandler[i];
    }

    public static ExchangeCoreBuilder builder() {
        return new ExchangeCoreBuilder();
    }
}
