/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.jvm.scheduling;

import java.io.PrintStream;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.ballerinalang.jvm.BallerinaErrors;
import org.ballerinalang.jvm.scheduling.SchedulerItem;
import org.ballerinalang.jvm.scheduling.State;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.scheduling.StrandHolder;
import org.ballerinalang.jvm.scheduling.WaitContext;
import org.ballerinalang.jvm.scheduling.WorkerDataChannel;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.types.BTypes;
import org.ballerinalang.jvm.values.ChannelDetails;
import org.ballerinalang.jvm.values.FPValue;
import org.ballerinalang.jvm.values.FutureValue;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Scheduler {
    private static final Logger logger = LoggerFactory.getLogger(Scheduler.class);
    public boolean immortal;
    private BlockingQueue<SchedulerItem> runnableList = new LinkedBlockingDeque<SchedulerItem>();
    private static final boolean DEBUG = false;
    private static final BlockingQueue<String> DEBUG_LOG;
    private static final ThreadLocal<StrandHolder> strandHolder;
    private AtomicInteger totalStrands = new AtomicInteger();
    private int numThreads;
    private Semaphore mainBlockSem;

    public Scheduler(boolean immortal) {
        try {
            String poolSizeConf = System.getenv("BALLERINA_MAX_POOL_SIZE");
            this.numThreads = poolSizeConf == null ? Runtime.getRuntime().availableProcessors() * 2 : Integer.parseInt(poolSizeConf);
        }
        catch (Throwable t) {
            this.numThreads = Runtime.getRuntime().availableProcessors() * 2;
            logger.error("Error occurred in scheduler while reading system variable:BALLERINA_MAX_POOL_SIZE", t);
        }
        this.immortal = immortal;
    }

    public Scheduler(int numThreads, boolean immortal) {
        this.numThreads = numThreads;
        this.immortal = immortal;
    }

    public static Strand getStrand() {
        Strand strand = Scheduler.strandHolder.get().strand;
        if (strand == null) {
            throw new IllegalStateException("strand is not accessible form non-strand-worker threads");
        }
        return strand;
    }

    public FutureValue scheduleFunction(Object[] params, FPValue<?, ?> fp, Strand parent, BType returnType) {
        return this.schedule(params, fp.getFunction(), parent, null, null, returnType);
    }

    public FutureValue scheduleConsumer(Object[] params, FPValue<?, ?> fp, Strand parent) {
        return this.schedule(params, fp.getConsumer(), parent);
    }

    public FutureValue schedule(Object[] params, Function function, Strand parent, CallableUnitCallback callback, Map<String, Object> properties, BType returnType) {
        FutureValue future2 = this.createFuture(parent, callback, properties, returnType);
        return this.schedule(params, function, parent, future2);
    }

    private FutureValue schedule(Object[] params, Function function, Strand parent, FutureValue future2) {
        SchedulerItem item;
        params[0] = future2.strand;
        future2.strand.schedulerItem = item = new SchedulerItem(function, params, future2);
        this.totalStrands.incrementAndGet();
        this.runnableList.add(item);
        return future2;
    }

    public FutureValue schedule(Object[] params, Consumer consumer, Strand parent) {
        SchedulerItem item;
        FutureValue future2 = this.createFuture(parent, null, null, BTypes.typeNull);
        params[0] = future2.strand;
        future2.strand.schedulerItem = item = new SchedulerItem(consumer, params, future2);
        this.totalStrands.incrementAndGet();
        this.runnableList.add(item);
        return future2;
    }

    public void start() {
        this.mainBlockSem = new Semaphore(-(this.numThreads - 1));
        for (int i = 0; i < this.numThreads - 1; ++i) {
            new Thread(this::runSafely, "jbal-strand-exec-" + i).start();
        }
        this.runSafely();
        try {
            this.mainBlockSem.acquire();
        }
        catch (InterruptedException e) {
            logger.error("Error while waiting for poison to work", e);
        }
    }

    private void runSafely() {
        try {
            this.run();
        }
        catch (Throwable t) {
            logger.error("Error occurred in scheduler", t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void run() {
        block13: while (true) {
            SchedulerItem item;
            try {
                item = this.runnableList.take();
            }
            catch (InterruptedException ignored) {
                continue;
            }
            if (item == SchedulerItem.POISON_PILL) break;
            Object result = null;
            Throwable panic = null;
            try {
                Scheduler.strandHolder.get().strand = item.future.strand;
                result = item.execute();
            }
            catch (Throwable e) {
                panic = e;
                this.notifyChannels(item, panic);
                logger.error("Strand died", e);
            }
            finally {
                Scheduler.strandHolder.get().strand = null;
            }
            switch (item.getState()) {
                case BLOCK_AND_YIELD: {
                    item.future.strand.lock();
                    if (item.getState().getStatus() == State.YIELD.getStatus()) {
                        this.reschedule(item);
                        item.future.strand.unlock();
                        break;
                    }
                    item.parked = true;
                    item.future.strand.unlock();
                    break;
                }
                case BLOCK_ON_AND_YIELD: {
                    WaitContext waitContext = item.future.strand.waitContext;
                    waitContext.lock();
                    waitContext.intermediate = false;
                    if (waitContext.runnable) {
                        waitContext.completed = true;
                        this.reschedule(item);
                    }
                    waitContext.unLock();
                    break;
                }
                case YIELD: {
                    this.reschedule(item);
                    break;
                }
                case RUNNABLE: {
                    item.future.result = result;
                    item.future.isDone = true;
                    item.future.panic = panic;
                    if (item.future.callback != null) {
                        if (item.future.panic != null) {
                            item.future.callback.notifyFailure(BallerinaErrors.createError(panic));
                        } else {
                            item.future.callback.notifySuccess();
                        }
                    }
                    Strand justCompleted = item.future.strand;
                    assert (!justCompleted.getState().equals((Object)State.DONE)) : "Can't be completed twice";
                    justCompleted.setState(State.DONE);
                    for (WaitContext ctx : justCompleted.waitingContexts) {
                        ctx.lock();
                        if (!ctx.completed && (item.future.panic != null && ctx.handlePanic() || ctx.waitCompleted(result))) {
                            if (ctx.intermediate) {
                                ctx.runnable = true;
                            } else {
                                ctx.completed = true;
                                this.reschedule(ctx.schedulerItem);
                            }
                        }
                        ctx.unLock();
                    }
                    this.cleanUp(justCompleted);
                    int strandsLeft = this.totalStrands.decrementAndGet();
                    if (strandsLeft != 0) continue block13;
                    assert (this.runnableList.size() == 0);
                    if (this.immortal) continue block13;
                    int i = 0;
                    while (true) {
                        if (i >= this.numThreads) continue block13;
                        this.runnableList.add(SchedulerItem.POISON_PILL);
                        ++i;
                    }
                }
                default: {
                    assert (false) : "illegal strand state during execute " + (Object)((Object)item.getState());
                    continue block13;
                }
            }
        }
        this.mainBlockSem.release();
    }

    public void unblockStrand(Strand strand) {
        strand.lock();
        if (strand.schedulerItem.parked) {
            strand.schedulerItem.parked = false;
            this.reschedule(strand.schedulerItem);
        } else {
            strand.setState(State.YIELD);
        }
        strand.unlock();
    }

    private void cleanUp(Strand justCompleted) {
        justCompleted.scheduler = null;
        justCompleted.frames = null;
        justCompleted.waitingContexts = null;
    }

    private synchronized void debugLog(String msg) {
        try {
            Thread.sleep(100L);
            DEBUG_LOG.add(msg);
            Thread.sleep(100L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void notifyChannels(SchedulerItem item, Throwable panic) {
        Set<ChannelDetails> channels = item.future.strand.channelDetails;
        for (ChannelDetails details : channels) {
            WorkerDataChannel wdChannel = details.channelInSameStrand ? item.future.strand.wdChannels.getWorkerDataChannel(details.name) : item.future.strand.parent.wdChannels.getWorkerDataChannel(details.name);
            if (details.send) {
                wdChannel.setSendPanic(panic);
                continue;
            }
            wdChannel.setReceiverPanic(panic);
        }
    }

    private void reschedule(SchedulerItem item) {
        if (!item.getState().equals((Object)State.RUNNABLE)) {
            item.setState(State.RUNNABLE);
            this.runnableList.add(item);
        } else {
            this.debugLog(item + " " + item.getState().toString() + " not rescheduled");
        }
    }

    private FutureValue createFuture(Strand parent, CallableUnitCallback callback, Map<String, Object> properties, BType constraint) {
        Strand newStrand = new Strand(this, parent, properties);
        if (parent != null) {
            newStrand.observerContext = parent.observerContext;
        }
        FutureValue future2 = new FutureValue(newStrand, callback, constraint);
        future2.strand.frames = new Object[100];
        return future2;
    }

    public void poison() {
        for (int i = 0; i < this.numThreads; ++i) {
            this.runnableList.add(SchedulerItem.POISON_PILL);
        }
    }

    private static /* synthetic */ void lambda$start$0(PrintStream out) {
        try {
            while (true) {
                String take = DEBUG_LOG.take();
                out.println(take);
            }
        }
        catch (InterruptedException e) {
            logger.error("Error in debug logger", e);
            return;
        }
    }

    static {
        strandHolder = ThreadLocal.withInitial(StrandHolder::new);
        DEBUG_LOG = null;
    }
}

