package org.ballerinalang.jvm.scheduling;

import java.io.PrintStream;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.ballerinalang.jvm.BallerinaErrors;
import org.ballerinalang.jvm.runtime.RuntimeConstants;
import org.ballerinalang.jvm.transactions.TransactionConstants;
import org.ballerinalang.jvm.transactions.TransactionLocalContext;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.types.BTypes;
import org.ballerinalang.jvm.util.BLangConstants;
import org.ballerinalang.jvm.util.RuntimeUtils;
import org.ballerinalang.jvm.values.ChannelDetails;
import org.ballerinalang.jvm.values.ErrorValue;
import org.ballerinalang.jvm.values.FPValue;
import org.ballerinalang.jvm.values.FutureValue;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/ballerinalang/jvm/scheduling/Scheduler.class */
public class Scheduler {
    private static final Logger logger;
    public boolean immortal;
    private static final boolean DEBUG = false;
    private static final BlockingQueue<String> DEBUG_LOG;
    private static final ThreadLocal<StrandHolder> strandHolder;
    private int numThreads;
    private Semaphore mainBlockSem;
    static final /* synthetic */ boolean $assertionsDisabled;
    private BlockingQueue<SchedulerItem> runnableList = new LinkedBlockingDeque();
    private AtomicInteger totalStrands = new AtomicInteger();

    public Scheduler(boolean z) {
        try {
            String str = System.getenv(BLangConstants.BALLERINA_MAX_POOL_SIZE_ENV_VAR);
            this.numThreads = str == null ? Runtime.getRuntime().availableProcessors() * 2 : Integer.parseInt(str);
        } catch (Throwable th) {
            this.numThreads = Runtime.getRuntime().availableProcessors() * 2;
            logger.error("Error occurred in scheduler while reading system variable:BALLERINA_MAX_POOL_SIZE", th);
        }
        this.immortal = z;
    }

    public Scheduler(int i, boolean z) {
        this.numThreads = i;
        this.immortal = z;
    }

    public static Strand getStrand() {
        Strand strand = strandHolder.get().strand;
        if (strand == null) {
            throw new IllegalStateException("strand is not accessible form non-strand-worker threads");
        }
        return strand;
    }

    public FutureValue scheduleFunction(Object[] objArr, FPValue<?, ?> fPValue, Strand strand, BType bType) {
        return schedule(objArr, fPValue.getFunction(), strand, null, null, bType);
    }

    public FutureValue scheduleConsumer(Object[] objArr, FPValue<?, ?> fPValue, Strand strand) {
        return schedule(objArr, fPValue.getConsumer(), strand, (CallableUnitCallback) null);
    }

    public FutureValue schedule(Object[] objArr, Function function, Strand strand, CallableUnitCallback callableUnitCallback, Map<String, Object> map, BType bType) {
        return schedule(objArr, function, strand, createFuture(strand, callableUnitCallback, map, bType));
    }

    private FutureValue schedule(Object[] objArr, Function function, Strand strand, FutureValue futureValue) {
        objArr[0] = futureValue.strand;
        SchedulerItem schedulerItem = new SchedulerItem(function, objArr, futureValue);
        futureValue.strand.schedulerItem = schedulerItem;
        this.totalStrands.incrementAndGet();
        this.runnableList.add(schedulerItem);
        return futureValue;
    }

    public FutureValue schedule(Object[] objArr, Consumer consumer, Strand strand, CallableUnitCallback callableUnitCallback) {
        FutureValue createFuture = createFuture(strand, callableUnitCallback, null, BTypes.typeNull);
        objArr[0] = createFuture.strand;
        SchedulerItem schedulerItem = new SchedulerItem(consumer, objArr, createFuture);
        createFuture.strand.schedulerItem = schedulerItem;
        this.totalStrands.incrementAndGet();
        this.runnableList.add(schedulerItem);
        return createFuture;
    }

    public void start() {
        this.mainBlockSem = new Semaphore(-(this.numThreads - 1));
        for (int i = 0; i < this.numThreads - 1; i++) {
            new Thread(this::runSafely, "jbal-strand-exec-" + i).start();
        }
        runSafely();
        try {
            this.mainBlockSem.acquire();
        } catch (InterruptedException e) {
            logger.error("Error while waiting for poison to work", e);
        }
    }

    private void runSafely() {
        try {
            run();
        } catch (Throwable th) {
            logger.error("Error occurred in scheduler", th);
        }
    }

    private void run() {
        SchedulerItem take;
        while (true) {
            try {
                take = this.runnableList.take();
            } catch (InterruptedException e) {
            }
            if (take == SchedulerItem.POISON_PILL) {
                this.mainBlockSem.release();
                return;
            }
            Object obj = null;
            Throwable th = null;
            try {
                try {
                    strandHolder.get().strand = take.future.strand;
                    obj = take.execute();
                    strandHolder.get().strand = null;
                } catch (Throwable th2) {
                    th = th2;
                    notifyChannels(take, th);
                    if (!(th2 instanceof ErrorValue)) {
                        RuntimeUtils.printCrashLog(th2);
                    }
                    if (take.isYielded()) {
                        RuntimeUtils.printCrashLog(th2);
                    }
                    strandHolder.get().strand = null;
                }
                switch (take.getState()) {
                    case BLOCK_AND_YIELD:
                        take.future.strand.lock();
                        if (take.getState().getStatus() == State.YIELD.getStatus()) {
                            reschedule(take);
                            take.future.strand.unlock();
                            break;
                        } else {
                            take.parked = true;
                            take.future.strand.unlock();
                            break;
                        }
                    case BLOCK_ON_AND_YIELD:
                        WaitContext waitContext = take.future.strand.waitContext;
                        waitContext.lock();
                        waitContext.intermediate = false;
                        if (waitContext.runnable) {
                            waitContext.completed = true;
                            reschedule(take);
                        }
                        waitContext.unLock();
                        break;
                    case YIELD:
                        reschedule(take);
                        break;
                    case RUNNABLE:
                        take.future.result = obj;
                        take.future.isDone = true;
                        take.future.panic = th;
                        if (take.future.callback != null) {
                            if (take.future.panic != null) {
                                take.future.callback.notifyFailure(BallerinaErrors.createError(th));
                                if (take.future.transactionLocalContext != null) {
                                    take.future.transactionLocalContext.notifyLocalRemoteParticipantFailure();
                                }
                            } else {
                                take.future.callback.notifySuccess();
                            }
                        }
                        Strand strand = take.future.strand;
                        if (!$assertionsDisabled && strand.getState().equals(State.DONE)) {
                            throw new AssertionError("Can't be completed twice");
                        }
                        strand.setState(State.DONE);
                        for (WaitContext waitContext2 : strand.waitingContexts) {
                            waitContext2.lock();
                            if (!waitContext2.completed && ((take.future.panic != null && waitContext2.handlePanic()) || waitContext2.waitCompleted(obj))) {
                                if (waitContext2.intermediate) {
                                    waitContext2.runnable = true;
                                } else {
                                    waitContext2.completed = true;
                                    reschedule(waitContext2.schedulerItem);
                                }
                            }
                            waitContext2.unLock();
                        }
                        cleanUp(strand);
                        if (this.totalStrands.decrementAndGet() == 0) {
                            if (!$assertionsDisabled && this.runnableList.size() != 0) {
                                throw new AssertionError();
                            }
                            if (this.immortal) {
                                break;
                            } else {
                                for (int i = 0; i < this.numThreads; i++) {
                                    this.runnableList.add(SchedulerItem.POISON_PILL);
                                }
                                break;
                            }
                        } else {
                            continue;
                        }
                    default:
                        if (!$assertionsDisabled) {
                            throw new AssertionError("illegal strand state during execute " + take.getState());
                        }
                        break;
                }
            } catch (Throwable th3) {
                strandHolder.get().strand = null;
                throw th3;
            }
        }
    }

    public void unblockStrand(Strand strand) {
        strand.lock();
        if (strand.schedulerItem.parked) {
            strand.schedulerItem.parked = false;
            reschedule(strand.schedulerItem);
        } else {
            strand.setState(State.YIELD);
        }
        strand.unlock();
    }

    private void cleanUp(Strand strand) {
        strand.scheduler = null;
        strand.frames = null;
        strand.waitingContexts = null;
    }

    private synchronized void debugLog(String str) {
        try {
            Thread.sleep(100L);
            DEBUG_LOG.add(str);
            Thread.sleep(100L);
        } catch (InterruptedException e) {
        }
    }

    private void notifyChannels(SchedulerItem schedulerItem, Throwable th) {
        for (ChannelDetails channelDetails : schedulerItem.future.strand.channelDetails) {
            WorkerDataChannel workerDataChannel = channelDetails.channelInSameStrand ? schedulerItem.future.strand.wdChannels.getWorkerDataChannel(channelDetails.name) : schedulerItem.future.strand.parent.wdChannels.getWorkerDataChannel(channelDetails.name);
            if (channelDetails.send) {
                workerDataChannel.setSendPanic(th);
            } else {
                workerDataChannel.setReceiverPanic(th);
            }
        }
    }

    private void reschedule(SchedulerItem schedulerItem) {
        if (schedulerItem.getState().equals(State.RUNNABLE)) {
            debugLog(schedulerItem + " " + schedulerItem.getState().toString() + " not rescheduled");
        } else {
            schedulerItem.setState(State.RUNNABLE);
            this.runnableList.add(schedulerItem);
        }
    }

    private FutureValue createFuture(Strand strand, CallableUnitCallback callableUnitCallback, Map<String, Object> map, BType bType) {
        Strand strand2 = new Strand(this, strand, map);
        if (strand != null) {
            strand2.observerContext = strand.observerContext;
        }
        FutureValue futureValue = new FutureValue(strand2, callableUnitCallback, bType);
        infectResourceFunction(strand2, futureValue);
        futureValue.strand.frames = new Object[100];
        return futureValue;
    }

    public void poison() {
        for (int i = 0; i < this.numThreads; i++) {
            this.runnableList.add(SchedulerItem.POISON_PILL);
        }
    }

    private void infectResourceFunction(Strand strand, FutureValue futureValue) {
        if (((String) strand.getProperty(RuntimeConstants.GLOBAL_TRANSACTION_ID)) != null) {
            TransactionLocalContext create = TransactionLocalContext.create(strand.getProperty(RuntimeConstants.GLOBAL_TRANSACTION_ID).toString(), strand.getProperty(RuntimeConstants.TRANSACTION_URL).toString(), TransactionConstants.DEFAULT_COORDINATION_TYPE);
            strand.setLocalTransactionContext(create);
            futureValue.transactionLocalContext = create;
        }
    }

    private static /* synthetic */ void lambda$start$0(PrintStream printStream) {
        while (true) {
            try {
                printStream.println(DEBUG_LOG.take());
            } catch (InterruptedException e) {
                logger.error("Error in debug logger", e);
                return;
            }
        }
    }

    static {
        $assertionsDisabled = !Scheduler.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(Scheduler.class);
        strandHolder = ThreadLocal.withInitial(StrandHolder::new);
        DEBUG_LOG = null;
    }
}
