/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.bre.bvm;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.ballerinalang.model.types.BArrayType;
import org.ballerinalang.model.types.BStructType;
import org.ballerinalang.model.types.BType;
import org.ballerinalang.model.values.BClosure;
import org.ballerinalang.model.values.BFunctionPointer;
import org.ballerinalang.model.values.BStruct;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.siddhi.core.SiddhiAppRuntime;
import org.ballerinalang.siddhi.core.SiddhiManager;
import org.ballerinalang.siddhi.core.event.Event;
import org.ballerinalang.siddhi.core.stream.output.StreamCallback;
import org.ballerinalang.util.codegen.CallableUnitInfo;
import org.ballerinalang.util.exceptions.BallerinaException;
import org.ballerinalang.util.program.BLangFunctions;

public class StreamingRuntimeManager {
    private static StreamingRuntimeManager streamingRuntimeManager;
    private SiddhiManager siddhiManager = new SiddhiManager();
    private List<SiddhiAppRuntime> siddhiAppRuntimeList = new ArrayList<SiddhiAppRuntime>();

    private StreamingRuntimeManager() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static StreamingRuntimeManager getInstance() {
        if (streamingRuntimeManager != null) {
            return streamingRuntimeManager;
        }
        Class<StreamingRuntimeManager> clazz = StreamingRuntimeManager.class;
        synchronized (StreamingRuntimeManager.class) {
            if (streamingRuntimeManager == null) {
                streamingRuntimeManager = new StreamingRuntimeManager();
            }
            // ** MonitorExit[var0] (shouldn't be in output)
            return streamingRuntimeManager;
        }
    }

    public SiddhiAppRuntime createSiddhiAppRuntime(String siddhiApp) {
        SiddhiAppRuntime siddhiAppRuntime = this.siddhiManager.createSiddhiAppRuntime(siddhiApp);
        this.siddhiAppRuntimeList.add(siddhiAppRuntime);
        siddhiAppRuntime.start();
        return siddhiAppRuntime;
    }

    public void addCallback(String streamId, final BFunctionPointer functionPointer, SiddhiAppRuntime siddhiAppRuntime) {
        BType[] parameters = functionPointer.value().getFunctionInfo().getParamTypes();
        final ArrayList<BValue> closureArgs = new ArrayList<BValue>();
        for (BClosure closure : functionPointer.getClosureVars()) {
            closureArgs.add(closure.value());
        }
        final BStructType structType = (BStructType)((BArrayType)parameters[parameters.length - 1]).getElementType();
        if (!(parameters[parameters.length - 1] instanceof BArrayType)) {
            throw new BallerinaException("incompatible function: inline function needs to be a function accepting an object array");
        }
        siddhiAppRuntime.addCallback(streamId, new StreamCallback(){

            public void receive(Event[] events) {
                for (Event event : events) {
                    AtomicInteger intVarIndex = new AtomicInteger(-1);
                    AtomicInteger floatVarIndex = new AtomicInteger(-1);
                    AtomicInteger boolVarIndex = new AtomicInteger(-1);
                    AtomicInteger stringVarIndex = new AtomicInteger(-1);
                    BStruct output = new BStruct(structType);
                    for (Object field : event.getData()) {
                        if (field instanceof Long) {
                            output.setIntField(intVarIndex.incrementAndGet(), (Long)field);
                            continue;
                        }
                        if (field instanceof Double) {
                            output.setFloatField(floatVarIndex.incrementAndGet(), (Double)field);
                            continue;
                        }
                        if (field instanceof Boolean) {
                            output.setBooleanField(boolVarIndex.incrementAndGet(), (Integer)field);
                            continue;
                        }
                        if (!(field instanceof String)) continue;
                        output.setStringField(stringVarIndex.incrementAndGet(), (String)field);
                    }
                    ArrayList<BStruct> argsList = new ArrayList<BStruct>();
                    argsList.addAll(closureArgs);
                    argsList.add(output);
                    BLangFunctions.invokeCallable((CallableUnitInfo)functionPointer.value().getFunctionInfo(), argsList.toArray(new BValue[argsList.size()]));
                }
            }
        });
    }
}

