/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.model.values;

import io.ballerina.messaging.broker.core.BrokerException;
import io.ballerina.messaging.broker.core.Consumer;
import io.ballerina.messaging.broker.core.ContentChunk;
import io.ballerina.messaging.broker.core.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledHeapByteBuf;
import java.util.UUID;
import org.ballerinalang.broker.BrokerUtils;
import org.ballerinalang.model.types.BAnyType;
import org.ballerinalang.model.types.BIndexedType;
import org.ballerinalang.model.types.BStreamType;
import org.ballerinalang.model.types.BStructType;
import org.ballerinalang.model.types.BType;
import org.ballerinalang.model.types.BTypes;
import org.ballerinalang.model.types.BUnionType;
import org.ballerinalang.model.values.BFunctionPointer;
import org.ballerinalang.model.values.BRefType;
import org.ballerinalang.model.values.BStruct;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.siddhi.core.stream.input.InputHandler;
import org.ballerinalang.util.codegen.CallableUnitInfo;
import org.ballerinalang.util.exceptions.BallerinaException;
import org.ballerinalang.util.program.BLangFunctions;

public class BStream
implements BRefType<Object> {
    private static final String TOPIC_NAME_PREFIX = "TOPIC_NAME_";
    private BType constraintType;
    private String streamId = "";
    private String topicName;

    public BStream(BType type, String name) {
        if (((BStreamType)type).getConstrainedType() == null) {
            throw new BallerinaException("a stream cannot be declared without a constraint");
        }
        this.constraintType = ((BStreamType)type).getConstrainedType();
        this.topicName = this.constraintType.getName() != null ? TOPIC_NAME_PREFIX + this.constraintType.getName().toUpperCase() + "_" + name : (this.constraintType instanceof BIndexedType ? TOPIC_NAME_PREFIX + ((BIndexedType)((Object)this.constraintType)).getElementType() + "_" + name : TOPIC_NAME_PREFIX + name);
        this.topicName = this.topicName.concat("_").concat(UUID.randomUUID().toString());
        this.streamId = name;
    }

    public String getStreamId() {
        return this.streamId;
    }

    @Override
    public String stringValue() {
        return "";
    }

    @Override
    public BType getType() {
        return BTypes.typeStream;
    }

    @Override
    public BValue copy() {
        return null;
    }

    @Override
    public Object value() {
        return null;
    }

    public BType getConstraintType() {
        return this.constraintType;
    }

    public void publish(BValue data) {
        BType dataType = data.getType();
        if (!(dataType.equals(this.constraintType) || this.constraintType instanceof BUnionType && ((BUnionType)this.constraintType).getMemberTypes().contains(dataType) || this.constraintType instanceof BAnyType)) {
            throw new BallerinaException("incompatible types: value of type:" + dataType.getName() + " cannot be added to a stream of type:" + this.constraintType.getName());
        }
        BrokerUtils.publish(this.topicName, (ByteBuf)new BallerinaStreamByteBuf(data));
    }

    public void subscribe(BFunctionPointer functionPointer) {
        BType[] parameters = functionPointer.funcRefCPEntry.getFunctionInfo().getParamTypes();
        if (parameters[0].getTag() != this.constraintType.getTag() || this.constraintType instanceof BStructType && ((BStructType)parameters[0]).structInfo.getType() != this.constraintType) {
            throw new BallerinaException("incompatible function: subscription function needs to be a function accepting:" + this.constraintType.getName());
        }
        String queueName = String.valueOf(System.currentTimeMillis()) + UUID.randomUUID().toString();
        BrokerUtils.addSubscription(this.topicName, new StreamSubscriber(queueName, functionPointer));
    }

    public void subscribe(InputHandler inputHandler) {
        if (this.constraintType.getTag() != 15) {
            throw new BallerinaException("Streaming Support is only available with streams accepting objects");
        }
        String queueName = String.valueOf(UUID.randomUUID());
        BrokerUtils.addSubscription(this.topicName, new InternalStreamSubscriber(this.topicName, queueName, inputHandler));
    }

    private class BallerinaStreamByteBuf
    extends UnpooledHeapByteBuf {
        private final BValue streamEvent;

        BallerinaStreamByteBuf(BValue streamEvent) {
            super((ByteBufAllocator)UnpooledByteBufAllocator.DEFAULT, 0, 0);
            this.streamEvent = streamEvent;
        }
    }

    private class InternalStreamSubscriber
    extends Consumer {
        private final String topic;
        private final String queueName;
        private final InputHandler inputHandler;

        InternalStreamSubscriber(String topic, String queueName, InputHandler inputHandler) {
            this.topic = topic;
            this.queueName = queueName;
            this.inputHandler = inputHandler;
        }

        protected void send(Message message) throws BrokerException {
            BValue data = ((BallerinaStreamByteBuf)((ContentChunk)message.getContentChunks().get(0)).getByteBuf().unwrap()).streamEvent;
            Object[] event = this.createEvent((BStruct)data);
            try {
                this.inputHandler.send(event);
            }
            catch (InterruptedException e) {
                throw new BallerinaException("Error while sending events to stream: " + this.topic + ": " + e.getMessage(), e);
            }
        }

        private Object[] createEvent(BStruct data) {
            BStructType streamType = data.getType();
            int intValueIndex = -1;
            int floatValueIndex = -1;
            int stringValueIndex = -1;
            int boolValueIndex = -1;
            Object[] event = new Object[streamType.getStructFields().length];
            block6: for (int index = 0; index < streamType.getStructFields().length; ++index) {
                BStructType.StructField field = streamType.getStructFields()[index];
                switch (field.getFieldType().getTag()) {
                    case 1: {
                        event[index] = data.getIntField(++intValueIndex);
                        continue block6;
                    }
                    case 2: {
                        event[index] = data.getFloatField(++floatValueIndex);
                        continue block6;
                    }
                    case 4: {
                        event[index] = data.getBooleanField(++boolValueIndex);
                        continue block6;
                    }
                    case 3: {
                        event[index] = data.getStringField(++stringValueIndex);
                        continue block6;
                    }
                    default: {
                        throw new BallerinaException("Fields in streams do not support data types other than int, float, boolean and string");
                    }
                }
            }
            return event;
        }

        public String getQueueName() {
            return this.queueName;
        }

        protected void close() throws BrokerException {
        }

        public boolean isExclusive() {
            return false;
        }

        public boolean isReady() {
            return true;
        }
    }

    private class StreamSubscriber
    extends Consumer {
        final String queueName;
        final BFunctionPointer functionPointer;

        StreamSubscriber(String queueName, BFunctionPointer functionPointer) {
            this.queueName = queueName;
            this.functionPointer = functionPointer;
        }

        protected void send(Message message) throws BrokerException {
            BValue data = ((BallerinaStreamByteBuf)((ContentChunk)message.getContentChunks().get(0)).getByteBuf().unwrap()).streamEvent;
            BLangFunctions.invokeCallable((CallableUnitInfo)this.functionPointer.value().getFunctionInfo(), new BValue[]{data});
        }

        public String getQueueName() {
            return this.queueName;
        }

        protected void close() throws BrokerException {
        }

        public boolean isExclusive() {
            return false;
        }

        public boolean isReady() {
            return true;
        }
    }
}

