/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.streams;

import java.util.Map;
import org.ballerinalang.model.types.BField;
import org.ballerinalang.model.types.BStructureType;
import org.ballerinalang.model.values.BBoolean;
import org.ballerinalang.model.values.BFloat;
import org.ballerinalang.model.values.BInteger;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BStream;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.siddhi.core.stream.input.InputHandler;
import org.ballerinalang.streams.StreamSubscription;
import org.ballerinalang.streams.StreamSubscriptionManager;
import org.ballerinalang.util.exceptions.BallerinaException;

public class SiddhiStreamSubscription
extends StreamSubscription {
    private BStream stream;
    private final InputHandler inputHandler;

    SiddhiStreamSubscription(BStream stream, InputHandler inputHandler, StreamSubscriptionManager streamSubscriptionManager) {
        super(streamSubscriptionManager);
        this.stream = stream;
        this.inputHandler = inputHandler;
    }

    private Object[] createEvent(BMap<String, BValue> data) {
        BStructureType streamType = (BStructureType)data.getType();
        Object[] event = new Object[streamType.getFields().size()];
        int index = 0;
        block6: for (Map.Entry<String, BField> fieldEntry : streamType.getFields().entrySet()) {
            BField field = fieldEntry.getValue();
            switch (field.getFieldType().getTag()) {
                case 1: {
                    event[index++] = ((BInteger)data.get(field.fieldName)).intValue();
                    continue block6;
                }
                case 3: {
                    event[index++] = ((BFloat)data.get(field.fieldName)).floatValue();
                    continue block6;
                }
                case 6: {
                    event[index++] = ((BBoolean)data.get(field.fieldName)).booleanValue();
                    continue block6;
                }
                case 5: {
                    event[index++] = data.get(field.fieldName).stringValue();
                    continue block6;
                }
            }
            throw new BallerinaException("Fields in streams do not support data types other than int, float, boolean and string");
        }
        return event;
    }

    @Override
    public void execute(BValue data) {
        Object[] event = this.createEvent((BMap)data);
        try {
            this.inputHandler.send(event);
        }
        catch (InterruptedException e) {
            throw new BallerinaException("Error while sending events to stream: " + this.stream.getStreamId() + ": " + e.getMessage(), e);
        }
    }

    @Override
    public BStream getStream() {
        return this.stream;
    }
}

