/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.model.values;

import io.ballerina.messaging.broker.core.BrokerException;
import io.ballerina.messaging.broker.core.Consumer;
import io.ballerina.messaging.broker.core.ContentChunk;
import io.ballerina.messaging.broker.core.Message;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.ballerinalang.bre.Context;
import org.ballerinalang.model.types.BStreamType;
import org.ballerinalang.model.types.BStructType;
import org.ballerinalang.model.types.BType;
import org.ballerinalang.model.types.BTypes;
import org.ballerinalang.model.util.JSONUtils;
import org.ballerinalang.model.values.BFunctionPointer;
import org.ballerinalang.model.values.BJSON;
import org.ballerinalang.model.values.BRefType;
import org.ballerinalang.model.values.BStruct;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.siddhi.core.stream.input.InputHandler;
import org.ballerinalang.util.BrokerUtils;
import org.ballerinalang.util.codegen.CallableUnitInfo;
import org.ballerinalang.util.exceptions.BallerinaException;
import org.ballerinalang.util.program.BLangFunctions;

public class BStream
implements BRefType<Object> {
    private static final String TOPIC_NAME_PREFIX = "TOPIC_NAME_";
    private BStructType constraintType;
    private String streamId = "";
    private String topicName;

    public BStream(BType type, String name) {
        if (((BStreamType)type).getConstrainedType() == null) {
            throw new BallerinaException("a stream cannot be declared without a constraint");
        }
        this.constraintType = (BStructType)((BStreamType)type).getConstrainedType();
        this.topicName = TOPIC_NAME_PREFIX + ((BStreamType)type).getConstrainedType().getName().toUpperCase() + "_" + name;
        this.streamId = name;
    }

    public String getStreamId() {
        return this.streamId;
    }

    @Override
    public String stringValue() {
        return "";
    }

    @Override
    public BType getType() {
        return BTypes.typeStream;
    }

    @Override
    public BValue copy() {
        return null;
    }

    @Override
    public Object value() {
        return null;
    }

    public void publish(BStruct data) {
        if (data.getType() != this.constraintType) {
            throw new BallerinaException("incompatible types: object of type:" + data.getType().getName() + " cannot be added to a stream of type:" + this.constraintType.getName());
        }
        BrokerUtils.publish(this.topicName, JSONUtils.convertStructToJSON(data).stringValue().getBytes());
    }

    public void subscribe(Context context, BFunctionPointer functionPointer) {
        BType[] parameters = functionPointer.funcRefCPEntry.getFunctionInfo().getParamTypes();
        if (!(parameters[0] instanceof BStructType) || ((BStructType)parameters[0]).structInfo.getType() != this.constraintType) {
            throw new BallerinaException("incompatible function: subscription function needs to be a function accepting an object of type:" + this.constraintType.getName());
        }
        String queueName = String.valueOf(System.currentTimeMillis()) + UUID.randomUUID().toString();
        BrokerUtils.addSubscription(this.topicName, new StreamSubscriber(queueName, functionPointer));
    }

    public void subscribe(InputHandler inputHandler) {
        String queueName = String.valueOf(UUID.randomUUID());
        BrokerUtils.addSubscription(this.topicName, new InternalStreamSubscriber(this.topicName, queueName, inputHandler));
    }

    private class InternalStreamSubscriber
    extends Consumer {
        private final String topic;
        private final String queueName;
        private final InputHandler inputHandler;

        InternalStreamSubscriber(String topic, String queueName, InputHandler inputHandler) {
            this.topic = topic;
            this.queueName = queueName;
            this.inputHandler = inputHandler;
        }

        protected void send(Message message) throws BrokerException {
            byte[] bytes = new byte[]{};
            for (ContentChunk chunk : message.getContentChunks()) {
                bytes = new byte[chunk.getBytes().readableBytes()];
                chunk.getBytes().getBytes(0, bytes);
            }
            BJSON json = new BJSON(new String(bytes, StandardCharsets.UTF_8));
            BStruct data = JSONUtils.convertJSONToStruct(json, BStream.this.constraintType);
            Object[] event = this.createEvent(data);
            try {
                this.inputHandler.send(event);
            }
            catch (InterruptedException e) {
                throw new BallerinaException("Error while sending events to stream: " + this.topic + ": " + e.getMessage(), e);
            }
        }

        private Object[] createEvent(BStruct data) {
            BStructType streamType = data.getType();
            int intValueIndex = -1;
            int floatValueIndex = -1;
            int stringValueIndex = -1;
            int boolValueIndex = -1;
            Object[] event = new Object[streamType.getStructFields().length];
            block6: for (int index = 0; index < streamType.getStructFields().length; ++index) {
                BStructType.StructField field = streamType.getStructFields()[index];
                switch (field.getFieldType().getTag()) {
                    case 1: {
                        event[index] = data.getIntField(++intValueIndex);
                        continue block6;
                    }
                    case 2: {
                        event[index] = data.getFloatField(++floatValueIndex);
                        continue block6;
                    }
                    case 4: {
                        event[index] = data.getBooleanField(++boolValueIndex);
                        continue block6;
                    }
                    case 3: {
                        event[index] = data.getStringField(++stringValueIndex);
                        continue block6;
                    }
                    default: {
                        throw new BallerinaException("Fields in streams do not support data types other than int, float, boolean and string");
                    }
                }
            }
            return event;
        }

        public String getQueueName() {
            return this.queueName;
        }

        protected void close() throws BrokerException {
        }

        public boolean isExclusive() {
            return false;
        }

        public boolean isReady() {
            return true;
        }
    }

    private class StreamSubscriber
    extends Consumer {
        final String queueName;
        final BFunctionPointer functionPointer;

        StreamSubscriber(String queueName, BFunctionPointer functionPointer) {
            this.queueName = queueName;
            this.functionPointer = functionPointer;
        }

        protected void send(Message message) throws BrokerException {
            byte[] bytes = new byte[]{};
            for (ContentChunk chunk : message.getContentChunks()) {
                bytes = new byte[chunk.getBytes().readableBytes()];
                chunk.getBytes().getBytes(0, bytes);
            }
            BJSON json = new BJSON(new String(bytes, StandardCharsets.UTF_8));
            BStruct data = JSONUtils.convertJSONToStruct(json, BStream.this.constraintType);
            BValue[] args = new BValue[]{data};
            BLangFunctions.invokeCallable((CallableUnitInfo)this.functionPointer.value().getFunctionInfo(), args);
        }

        public String getQueueName() {
            return this.queueName;
        }

        protected void close() throws BrokerException {
        }

        public boolean isExclusive() {
            return false;
        }

        public boolean isReady() {
            return true;
        }
    }
}

