/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.jvm.scheduling;

import java.io.PrintStream;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.ballerinalang.jvm.BallerinaErrors;
import org.ballerinalang.jvm.scheduling.ItemGroup;
import org.ballerinalang.jvm.scheduling.SchedulerItem;
import org.ballerinalang.jvm.scheduling.State;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.scheduling.StrandHolder;
import org.ballerinalang.jvm.scheduling.WaitContext;
import org.ballerinalang.jvm.scheduling.WorkerDataChannel;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.types.BTypes;
import org.ballerinalang.jvm.util.RuntimeUtils;
import org.ballerinalang.jvm.util.exceptions.BallerinaErrorReasons;
import org.ballerinalang.jvm.values.ChannelDetails;
import org.ballerinalang.jvm.values.ErrorValue;
import org.ballerinalang.jvm.values.FPValue;
import org.ballerinalang.jvm.values.FutureValue;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;

public class Scheduler {
    private PrintStream err = System.err;
    public boolean immortal;
    private BlockingQueue<ItemGroup> runnableList = new LinkedBlockingDeque<ItemGroup>();
    private static final ThreadLocal<StrandHolder> strandHolder = ThreadLocal.withInitial(StrandHolder::new);
    private AtomicInteger totalStrands = new AtomicInteger();
    private static String poolSizeConf = System.getenv("BALLERINA_MAX_POOL_SIZE");
    private final int numThreads;
    private static int poolSize = Runtime.getRuntime().availableProcessors() * 2;
    private Semaphore mainBlockSem;

    public Scheduler(boolean immortal) {
        try {
            if (poolSizeConf != null) {
                poolSize = Integer.parseInt(poolSizeConf);
            }
        }
        catch (Throwable t) {
            this.err.println("ballerina: error occurred in scheduler while reading system variable:BALLERINA_MAX_POOL_SIZE, " + t.getMessage());
        }
        this.numThreads = poolSize;
        this.immortal = immortal;
    }

    public Scheduler(int numThreads, boolean immortal) {
        this.numThreads = numThreads;
        this.immortal = immortal;
    }

    public static Strand getStrand() {
        Strand strand = Scheduler.strandHolder.get().strand;
        if (strand == null) {
            throw new IllegalStateException("strand is not accessible form non-strand-worker threads");
        }
        return strand;
    }

    public FutureValue scheduleFunction(Object[] params, FPValue<?, ?> fp, Strand parent, BType returnType) {
        return this.schedule(params, fp.getFunction(), parent, null, null, returnType);
    }

    public FutureValue scheduleLocal(Object[] params, FPValue<?, ?> fp, Strand parent, BType returnType) {
        SchedulerItem item;
        FutureValue future = this.createFuture(parent, null, null, returnType);
        params[0] = future.strand;
        future.strand.schedulerItem = item = new SchedulerItem(fp.getFunction(), params, future);
        this.totalStrands.incrementAndGet();
        future.strand.strandGroup = parent.strandGroup;
        this.addToRunnableList(item, parent.strandGroup);
        return future;
    }

    @Deprecated
    public FutureValue scheduleConsumer(Object[] params, FPValue<?, ?> fp, Strand parent) {
        return this.schedule(params, fp.getFunction(), parent, (CallableUnitCallback)null);
    }

    public FutureValue schedule(Object[] params, Function function, Strand parent, CallableUnitCallback callback, Map<String, Object> properties, BType returnType) {
        FutureValue future = this.createFuture(parent, callback, properties, returnType);
        return this.schedule(params, function, parent, future);
    }

    public FutureValue schedule(Object[] params, Function function, Strand parent, CallableUnitCallback callback) {
        FutureValue future = this.createFuture(parent, callback, null, BTypes.typeNull);
        return this.schedule(params, function, parent, future);
    }

    private FutureValue schedule(Object[] params, Function function, Strand parent, FutureValue future) {
        ItemGroup group;
        SchedulerItem item;
        params[0] = future.strand;
        future.strand.schedulerItem = item = new SchedulerItem(function, params, future);
        this.totalStrands.incrementAndGet();
        future.strand.strandGroup = group = new ItemGroup(item);
        group.scheduled.set(true);
        this.runnableList.add(group);
        return future;
    }

    @Deprecated
    public FutureValue schedule(Object[] params, Consumer consumer, Strand parent, CallableUnitCallback callback) {
        ItemGroup group;
        SchedulerItem item;
        FutureValue future = this.createFuture(parent, callback, null, BTypes.typeNull);
        params[0] = future.strand;
        future.strand.schedulerItem = item = new SchedulerItem(consumer, params, future);
        this.totalStrands.incrementAndGet();
        future.strand.strandGroup = group = new ItemGroup(item);
        group.scheduled.set(true);
        this.runnableList.add(group);
        return future;
    }

    public void start() {
        this.mainBlockSem = new Semaphore(-(this.numThreads - 1));
        for (int i = 0; i < this.numThreads - 1; ++i) {
            new Thread(this::runSafely, "jbal-strand-exec-" + i).start();
        }
        this.runSafely();
        try {
            this.mainBlockSem.acquire();
        }
        catch (InterruptedException e) {
            RuntimeUtils.printCrashLog(e);
        }
    }

    private void runSafely() {
        try {
            this.run();
        }
        catch (Throwable t) {
            RuntimeUtils.printCrashLog(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    private void run() {
        while (true) {
            try {
                group = this.runnableList.take();
            }
            catch (InterruptedException ignored) {
                continue;
            }
            if (group == ItemGroup.POISON_PILL) break;
            isItemsEmpty = group.items.isEmpty();
            while (true) {
                if (isItemsEmpty) ** break;
                result = null;
                panic = null;
                item = group.get();
                try {
                    Scheduler.strandHolder.get().strand = item.future.strand;
                    result = item.execute();
                }
                catch (Throwable e) {
                    panic = this.createError(e);
                    this.notifyChannels(item, panic);
                    if (!(panic instanceof ErrorValue)) {
                        RuntimeUtils.printCrashLog(panic);
                    }
                    if (item.isYielded()) {
                        RuntimeUtils.printCrashLog(panic);
                    }
                }
                finally {
                    Scheduler.strandHolder.get().strand = null;
                }
                this.postProcess(item, result, panic);
                group.lock();
                isItemsEmpty = group.items.empty();
                if (isItemsEmpty) {
                    group.scheduled.set(false);
                }
                group.unlock();
            }
            break;
        }
        this.mainBlockSem.release();
    }

    private void postProcess(SchedulerItem item, Object result, Throwable panic) {
        switch (item.getState()) {
            case BLOCK_AND_YIELD: {
                item.future.strand.lock();
                if (item.getState().getStatus() == State.YIELD.getStatus()) {
                    this.reschedule(item);
                    item.future.strand.unlock();
                    break;
                }
                item.parked = true;
                item.future.strand.unlock();
                break;
            }
            case BLOCK_ON_AND_YIELD: {
                WaitContext waitContext = item.future.strand.waitContext;
                waitContext.lock();
                waitContext.intermediate = false;
                if (waitContext.runnable) {
                    waitContext.completed = true;
                    this.reschedule(item);
                }
                waitContext.unLock();
                break;
            }
            case YIELD: {
                this.reschedule(item);
                break;
            }
            case RUNNABLE: {
                item.future.result = result;
                item.future.isDone = true;
                item.future.panic = panic;
                if (item.future.callback != null) {
                    if (item.future.panic != null) {
                        item.future.callback.notifyFailure(BallerinaErrors.createError(panic));
                        if (item.future.strand.transactionLocalContext != null) {
                            item.future.strand.transactionLocalContext.notifyLocalRemoteParticipantFailure();
                        }
                    } else {
                        item.future.callback.notifySuccess();
                    }
                }
                Strand justCompleted = item.future.strand;
                assert (!justCompleted.getState().equals((Object)State.DONE)) : "Can't be completed twice";
                justCompleted.setState(State.DONE);
                for (WaitContext ctx : justCompleted.waitingContexts) {
                    ctx.lock();
                    if (!ctx.completed && (item.future.panic != null && ctx.handlePanic() || ctx.waitCompleted(result))) {
                        if (ctx.intermediate) {
                            ctx.runnable = true;
                        } else {
                            ctx.completed = true;
                            this.reschedule(ctx.schedulerItem);
                        }
                    }
                    ctx.unLock();
                }
                this.cleanUp(justCompleted);
                int strandsLeft = this.totalStrands.decrementAndGet();
                if (strandsLeft != 0) break;
                assert (this.runnableList.size() == 0);
                if (this.immortal) break;
                for (int i = 0; i < this.numThreads; ++i) {
                    this.runnableList.add(ItemGroup.POISON_PILL);
                }
                break;
            }
            default: {
                assert (false) : "illegal strand state during execute " + (Object)((Object)item.getState());
                break;
            }
        }
    }

    private Throwable createError(Throwable t) {
        if (t instanceof StackOverflowError) {
            ErrorValue error = BallerinaErrors.createError(BallerinaErrorReasons.STACK_OVERFLOW_ERROR);
            error.setStackTrace(t.getStackTrace());
            return error;
        }
        return t;
    }

    public void unblockStrand(Strand strand) {
        strand.lock();
        if (strand.schedulerItem.parked) {
            strand.schedulerItem.parked = false;
            this.reschedule(strand.schedulerItem);
        } else {
            strand.setState(State.YIELD);
        }
        strand.unlock();
    }

    private void cleanUp(Strand justCompleted) {
        justCompleted.scheduler = null;
        justCompleted.frames = null;
        justCompleted.waitingContexts = null;
    }

    private void notifyChannels(SchedulerItem item, Throwable panic) {
        Set<ChannelDetails> channels = item.future.strand.channelDetails;
        for (ChannelDetails details : channels) {
            WorkerDataChannel wdChannel = details.channelInSameStrand ? item.future.strand.wdChannels.getWorkerDataChannel(details.name) : item.future.strand.parent.wdChannels.getWorkerDataChannel(details.name);
            if (details.send) {
                wdChannel.setSendPanic(panic);
                continue;
            }
            wdChannel.setReceiverPanic(panic);
        }
    }

    private void reschedule(SchedulerItem item) {
        if (!item.getState().equals((Object)State.RUNNABLE)) {
            ItemGroup group = item.future.strand.strandGroup;
            item.setState(State.RUNNABLE);
            this.addToRunnableList(item, group);
        }
    }

    private void addToRunnableList(SchedulerItem item, ItemGroup group) {
        group.lock();
        group.add(item);
        if (group.scheduled.compareAndSet(false, true)) {
            this.runnableList.add(group);
        }
        group.unlock();
    }

    private FutureValue createFuture(Strand parent, CallableUnitCallback callback, Map<String, Object> properties, BType constraint) {
        Strand newStrand = new Strand(this, parent, properties);
        if (parent != null) {
            newStrand.observerContext = parent.observerContext;
        }
        FutureValue future = new FutureValue(newStrand, callback, constraint);
        future.strand.frames = new Object[100];
        return future;
    }

    public void poison() {
        for (int i = 0; i < this.numThreads; ++i) {
            this.runnableList.add(ItemGroup.POISON_PILL);
        }
    }
}

