/*
 * Decompiled with CFR 0.152.
 */
package io.ballerina.runtime.internal.scheduling;

import io.ballerina.runtime.api.PredefinedTypes;
import io.ballerina.runtime.api.async.Callback;
import io.ballerina.runtime.api.async.StrandMetadata;
import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.types.Type;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BFunctionPointer;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.internal.scheduling.ItemGroup;
import io.ballerina.runtime.internal.scheduling.SchedulerItem;
import io.ballerina.runtime.internal.scheduling.State;
import io.ballerina.runtime.internal.scheduling.Strand;
import io.ballerina.runtime.internal.scheduling.StrandHolder;
import io.ballerina.runtime.internal.scheduling.WaitContext;
import io.ballerina.runtime.internal.scheduling.WorkerDataChannel;
import io.ballerina.runtime.internal.util.RuntimeUtils;
import io.ballerina.runtime.internal.util.exceptions.BallerinaErrorReasons;
import io.ballerina.runtime.internal.values.ChannelDetails;
import io.ballerina.runtime.internal.values.FutureValue;
import java.io.PrintStream;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;

public class Scheduler {
    private PrintStream err = System.err;
    private volatile boolean immortal;
    private boolean listenerDeclarationFound;
    private BlockingQueue<ItemGroup> runnableList = new LinkedBlockingDeque<ItemGroup>();
    private static final ThreadLocal<StrandHolder> strandHolder = ThreadLocal.withInitial(StrandHolder::new);
    private AtomicInteger totalStrands = new AtomicInteger();
    private static String poolSizeConf = System.getenv("BALLERINA_MAX_POOL_SIZE");
    private final int numThreads;
    private static int poolSize = Runtime.getRuntime().availableProcessors() * 2;
    private Semaphore mainBlockSem;
    private ListenerRegistry listenerRegistry;

    public Scheduler(boolean immortal) {
        try {
            if (poolSizeConf != null) {
                poolSize = Integer.parseInt(poolSizeConf);
            }
        }
        catch (Throwable t) {
            this.err.println("ballerina: error occurred in scheduler while reading system variable:BALLERINA_MAX_POOL_SIZE, " + t.getMessage());
        }
        this.numThreads = poolSize;
        this.immortal = immortal;
        this.listenerRegistry = new ListenerRegistry();
    }

    public Scheduler(int numThreads, boolean immortal) {
        this.numThreads = numThreads;
        this.immortal = immortal;
        this.listenerRegistry = new ListenerRegistry();
    }

    public static Strand getStrand() {
        Strand strand = Scheduler.strandHolder.get().strand;
        if (strand == null) {
            throw new IllegalStateException("strand is not accessible form non-strand-worker threads");
        }
        return strand;
    }

    public FutureValue scheduleFunction(Object[] params, BFunctionPointer<?, ?> fp, Strand parent, Type returnType, String strandName, StrandMetadata metadata) {
        return this.schedule(params, fp.getFunction(), parent, null, null, returnType, strandName, metadata);
    }

    public FutureValue scheduleLocal(Object[] params, BFunctionPointer<?, ?> fp, Strand parent, Type returnType, String strandName, StrandMetadata metadata) {
        FutureValue future = this.createFuture(parent, null, null, returnType, strandName, metadata);
        return this.scheduleLocal(params, fp, parent, future);
    }

    public FutureValue scheduleLocal(Object[] params, BFunctionPointer<?, ?> fp, Strand parent, FutureValue future) {
        SchedulerItem item;
        params[0] = future.strand;
        future.strand.schedulerItem = item = new SchedulerItem(fp.getFunction(), params, future);
        this.totalStrands.incrementAndGet();
        future.strand.strandGroup = parent.strandGroup;
        parent.strandGroup.add(item);
        if (parent.strandGroup.scheduled.compareAndSet(false, true)) {
            this.runnableList.add(future.strand.strandGroup);
        }
        return future;
    }

    public FutureValue schedule(Object[] params, Function function, Strand parent, Callback callback, Map<String, Object> properties, Type returnType, String strandName, StrandMetadata metadata) {
        FutureValue future = this.createFuture(parent, callback, properties, returnType, strandName, metadata);
        return this.schedule(params, function, future);
    }

    public FutureValue schedule(Object[] params, Function function, Strand parent, Callback callback, String strandName, StrandMetadata metadata) {
        FutureValue future = this.createFuture(parent, callback, null, PredefinedTypes.TYPE_NULL, strandName, metadata);
        return this.schedule(params, function, future);
    }

    private FutureValue schedule(Object[] params, Function function, FutureValue future) {
        ItemGroup group;
        SchedulerItem item;
        params[0] = future.strand;
        future.strand.schedulerItem = item = new SchedulerItem(function, params, future);
        this.totalStrands.incrementAndGet();
        future.strand.strandGroup = group = new ItemGroup(item);
        group.scheduled.set(true);
        this.runnableList.add(group);
        return future;
    }

    @Deprecated
    public FutureValue schedule(Object[] params, Consumer consumer, Strand parent, Callback callback, String strandName, StrandMetadata metadata) {
        ItemGroup group;
        SchedulerItem item;
        FutureValue future = this.createFuture(parent, callback, null, PredefinedTypes.TYPE_NULL, strandName, metadata);
        params[0] = future.strand;
        future.strand.schedulerItem = item = new SchedulerItem(consumer, params, future);
        this.totalStrands.incrementAndGet();
        future.strand.strandGroup = group = new ItemGroup(item);
        group.scheduled.set(true);
        this.runnableList.add(group);
        return future;
    }

    public void start() {
        this.mainBlockSem = new Semaphore(-(this.numThreads - 1));
        for (int i = 0; i < this.numThreads - 1; ++i) {
            new Thread(this::runSafely, "jbal-strand-exec-" + i).start();
        }
        this.runSafely();
        try {
            this.mainBlockSem.acquire();
        }
        catch (InterruptedException e) {
            RuntimeUtils.printCrashLog(e);
        }
    }

    private void runSafely() {
        try {
            this.run();
        }
        catch (Throwable t) {
            RuntimeUtils.printCrashLog(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    private void run() {
        while (true) {
            try {
                group = this.runnableList.take();
            }
            catch (InterruptedException ignored) {
                continue;
            }
            if (group == ItemGroup.POISON_PILL) break;
            while (true) {
                if (group.items.empty()) ** break;
                result = null;
                panic = null;
                item = group.get();
                try {
                    Scheduler.strandHolder.get().strand = item.future.strand;
                    result = item.execute();
                }
                catch (Throwable e) {
                    panic = this.createError(e);
                    this.notifyChannels(item, panic);
                    if (!(panic instanceof BError)) {
                        RuntimeUtils.printCrashLog(panic);
                    }
                    if (item.isYielded()) {
                        RuntimeUtils.printCrashLog(panic);
                    }
                }
                finally {
                    Scheduler.strandHolder.get().strand = null;
                }
                this.postProcess(item, result, panic);
                if (!group.items.empty()) continue;
                group.scheduled.set(false);
            }
            break;
        }
        this.mainBlockSem.release();
    }

    private void postProcess(SchedulerItem item, Object result, Throwable panic) {
        switch (item.getState()) {
            case BLOCK_AND_YIELD: {
                item.future.strand.lock();
                if (item.getState().getStatus() == State.YIELD.getStatus()) {
                    this.reschedule(item);
                    item.future.strand.unlock();
                    break;
                }
                item.parked = true;
                item.future.strand.unlock();
                break;
            }
            case BLOCK_ON_AND_YIELD: {
                WaitContext waitContext = item.future.strand.waitContext;
                waitContext.lock();
                waitContext.intermediate = false;
                if (waitContext.runnable) {
                    waitContext.completed = true;
                    this.reschedule(item);
                }
                waitContext.unLock();
                break;
            }
            case YIELD: {
                this.reschedule(item);
                break;
            }
            case RUNNABLE: {
                item.future.result = result;
                item.future.isDone = true;
                item.future.panic = panic;
                if (item.future.callback != null) {
                    if (item.future.panic != null) {
                        item.future.callback.notifyFailure(ErrorCreator.createError(panic));
                        if (item.future.strand.currentTrxContext != null) {
                            item.future.strand.currentTrxContext.notifyLocalRemoteParticipantFailure();
                        }
                    } else {
                        item.future.callback.notifySuccess(result);
                    }
                }
                Strand justCompleted = item.future.strand;
                assert (!justCompleted.getState().equals((Object)State.DONE)) : "Can't be completed twice";
                justCompleted.setState(State.DONE);
                for (WaitContext ctx : justCompleted.waitingContexts) {
                    ctx.lock();
                    if (!ctx.completed && (item.future.panic != null && ctx.handlePanic() || ctx.waitCompleted(result))) {
                        if (ctx.intermediate) {
                            ctx.runnable = true;
                        } else {
                            ctx.completed = true;
                            this.reschedule(ctx.schedulerItem);
                        }
                    }
                    ctx.unLock();
                }
                this.cleanUp(justCompleted);
                int strandsLeft = this.totalStrands.decrementAndGet();
                if (strandsLeft != 0) break;
                assert (this.runnableList.size() == 0);
                if (this.immortal) break;
                this.poison();
                break;
            }
            default: {
                assert (false) : "illegal strand state during execute " + item.getState();
                break;
            }
        }
    }

    public void setImmortal(boolean immortal) {
        this.immortal = immortal;
    }

    private Throwable createError(Throwable t) {
        if (t instanceof StackOverflowError) {
            BError error = ErrorCreator.createError(BallerinaErrorReasons.STACK_OVERFLOW_ERROR);
            error.setStackTrace(t.getStackTrace());
            return error;
        }
        return t;
    }

    public void unblockStrand(Strand strand) {
        strand.lock();
        if (strand.schedulerItem.parked) {
            strand.schedulerItem.parked = false;
            this.reschedule(strand.schedulerItem);
        } else {
            strand.setState(State.YIELD);
        }
        strand.unlock();
    }

    private void cleanUp(Strand justCompleted) {
        justCompleted.scheduler = null;
        justCompleted.frames = null;
        justCompleted.waitingContexts = null;
    }

    private void notifyChannels(SchedulerItem item, Throwable panic) {
        Set<ChannelDetails> channels = item.future.strand.channelDetails;
        for (ChannelDetails details : channels) {
            WorkerDataChannel wdChannel = details.channelInSameStrand ? item.future.strand.wdChannels.getWorkerDataChannel(details.name) : item.future.strand.parent.wdChannels.getWorkerDataChannel(details.name);
            if (details.send) {
                wdChannel.setSendPanic(panic);
                continue;
            }
            wdChannel.setReceiverPanic(panic);
        }
    }

    private void reschedule(SchedulerItem item) {
        if (!item.getState().equals((Object)State.RUNNABLE)) {
            ItemGroup group = item.future.strand.strandGroup;
            item.setState(State.RUNNABLE);
            group.add(item);
            if (group.scheduled.compareAndSet(false, true)) {
                this.runnableList.add(group);
            }
        }
    }

    public FutureValue createFuture(Strand parent, Callback callback, Map<String, Object> properties, Type constraint, String name, StrandMetadata metadata) {
        Strand newStrand = new Strand(name, metadata, this, parent, properties);
        return this.createFuture(parent, callback, constraint, newStrand);
    }

    private FutureValue createFuture(Strand parent, Callback callback, Type constraint, Strand newStrand) {
        FutureValue future = new FutureValue(newStrand, callback, constraint);
        future.strand.frames = new Object[100];
        return future;
    }

    public void poison() {
        for (int i = 0; i < this.numThreads; ++i) {
            this.runnableList.add(ItemGroup.POISON_PILL);
        }
    }

    public void setListenerDeclarationFound(boolean listenerDeclarationFound) {
        this.listenerDeclarationFound = listenerDeclarationFound;
        if (listenerDeclarationFound) {
            this.setImmortal(true);
        }
    }

    public boolean isListenerDeclarationFound() {
        return this.listenerDeclarationFound;
    }

    public ListenerRegistry getListenerRegistry() {
        return this.listenerRegistry;
    }

    public class ListenerRegistry {
        private final Set<BObject> listenerSet = new HashSet<BObject>();

        public synchronized void registerListener(BObject listener) {
            this.listenerSet.add(listener);
            Scheduler.this.setImmortal(true);
        }

        public synchronized void deregisterListener(BObject listener) {
            this.listenerSet.remove(listener);
            if (!Scheduler.this.isListenerDeclarationFound() && this.listenerSet.isEmpty()) {
                Scheduler.this.setImmortal(false);
            }
        }

        public synchronized void stopListeners(Strand strand) {
            for (BObject listener : this.listenerSet) {
                listener.call(strand, "gracefulStop", new Object[0]);
            }
        }
    }
}

