package org.ballerinalang.model.values;

import io.ballerina.messaging.broker.core.BrokerException;
import io.ballerina.messaging.broker.core.Consumer;
import io.ballerina.messaging.broker.core.ContentChunk;
import io.ballerina.messaging.broker.core.Message;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.ballerinalang.bre.Context;
import org.ballerinalang.model.types.BStreamType;
import org.ballerinalang.model.types.BStructType;
import org.ballerinalang.model.types.BType;
import org.ballerinalang.model.types.BTypes;
import org.ballerinalang.model.util.JSONUtils;
import org.ballerinalang.siddhi.core.stream.input.InputHandler;
import org.ballerinalang.util.BLangConstants;
import org.ballerinalang.util.BrokerUtils;
import org.ballerinalang.util.exceptions.BallerinaException;
import org.ballerinalang.util.program.BLangFunctions;

/* loaded from: input_file:org/ballerinalang/model/values/BStream.class */
public class BStream implements BRefType<Object> {
    private static final String TOPIC_NAME_PREFIX = "TOPIC_NAME_";
    private BStructType constraintType;
    private String streamId;
    private String topicName;

    /* loaded from: input_file:org/ballerinalang/model/values/BStream$InternalStreamSubscriber.class */
    private class InternalStreamSubscriber extends Consumer {
        private final String topic;
        private final String queueName;
        private final InputHandler inputHandler;

        InternalStreamSubscriber(String str, String str2, InputHandler inputHandler) {
            this.topic = str;
            this.queueName = str2;
            this.inputHandler = inputHandler;
        }

        protected void send(Message message) throws BrokerException {
            byte[] bArr = new byte[0];
            for (ContentChunk contentChunk : message.getContentChunks()) {
                bArr = new byte[contentChunk.getBytes().readableBytes()];
                contentChunk.getBytes().getBytes(0, bArr);
            }
            try {
                this.inputHandler.send(createEvent(JSONUtils.convertJSONToStruct(new BJSON(new String(bArr, StandardCharsets.UTF_8)), BStream.this.constraintType)));
            } catch (InterruptedException e) {
                throw new BallerinaException("Error while sending events to stream: " + this.topic + ": " + e.getMessage(), e);
            }
        }

        private Object[] createEvent(BStruct bStruct) {
            BStructType type = bStruct.getType();
            int i = -1;
            int i2 = -1;
            int i3 = -1;
            int i4 = -1;
            Object[] objArr = new Object[type.getStructFields().length];
            for (int i5 = 0; i5 < type.getStructFields().length; i5++) {
                switch (type.getStructFields()[i5].getFieldType().getTag()) {
                    case 1:
                        i++;
                        objArr[i5] = Long.valueOf(bStruct.getIntField(i));
                        break;
                    case 2:
                        i2++;
                        objArr[i5] = Double.valueOf(bStruct.getFloatField(i2));
                        break;
                    case 3:
                        i3++;
                        objArr[i5] = bStruct.getStringField(i3);
                        break;
                    case 4:
                        i4++;
                        objArr[i5] = Integer.valueOf(bStruct.getBooleanField(i4));
                        break;
                    default:
                        throw new BallerinaException("Fields in streams do not support data types other than int, float, boolean and string");
                }
            }
            return objArr;
        }

        public String getQueueName() {
            return this.queueName;
        }

        protected void close() throws BrokerException {
        }

        public boolean isExclusive() {
            return false;
        }

        public boolean isReady() {
            return true;
        }
    }

    /* loaded from: input_file:org/ballerinalang/model/values/BStream$StreamSubscriber.class */
    private class StreamSubscriber extends Consumer {
        final String queueName;
        final BFunctionPointer functionPointer;

        StreamSubscriber(String str, BFunctionPointer bFunctionPointer) {
            this.queueName = str;
            this.functionPointer = bFunctionPointer;
        }

        protected void send(Message message) throws BrokerException {
            byte[] bArr = new byte[0];
            for (ContentChunk contentChunk : message.getContentChunks()) {
                bArr = new byte[contentChunk.getBytes().readableBytes()];
                contentChunk.getBytes().getBytes(0, bArr);
            }
            BLangFunctions.invokeCallable(this.functionPointer.value().getFunctionInfo(), new BValue[]{JSONUtils.convertJSONToStruct(new BJSON(new String(bArr, StandardCharsets.UTF_8)), BStream.this.constraintType)});
        }

        public String getQueueName() {
            return this.queueName;
        }

        protected void close() throws BrokerException {
        }

        public boolean isExclusive() {
            return false;
        }

        public boolean isReady() {
            return true;
        }
    }

    public BStream(BType bType, String str) {
        this.streamId = BLangConstants.STRING_EMPTY_VALUE;
        if (((BStreamType) bType).getConstrainedType() == null) {
            throw new BallerinaException("a stream cannot be declared without a constraint");
        }
        this.constraintType = (BStructType) ((BStreamType) bType).getConstrainedType();
        this.topicName = TOPIC_NAME_PREFIX + ((BStreamType) bType).getConstrainedType().getName().toUpperCase() + "_" + str;
        this.streamId = str;
    }

    public String getStreamId() {
        return this.streamId;
    }

    @Override // org.ballerinalang.model.values.BValue
    public String stringValue() {
        return BLangConstants.STRING_EMPTY_VALUE;
    }

    @Override // org.ballerinalang.model.values.BValue
    public BType getType() {
        return BTypes.typeStream;
    }

    @Override // org.ballerinalang.model.values.BValue
    public BValue copy() {
        return null;
    }

    @Override // org.ballerinalang.model.values.BRefType
    public Object value() {
        return null;
    }

    public BStructType getConstraintType() {
        return this.constraintType;
    }

    public void publish(BStruct bStruct) {
        if (bStruct.getType() != this.constraintType) {
            throw new BallerinaException("incompatible types: object of type:" + bStruct.getType().getName() + " cannot be added to a stream of type:" + this.constraintType.getName());
        }
        BrokerUtils.publish(this.topicName, JSONUtils.convertStructToJSON(bStruct).stringValue().getBytes());
    }

    public void subscribe(Context context, BFunctionPointer bFunctionPointer) {
        BType[] paramTypes = bFunctionPointer.funcRefCPEntry.getFunctionInfo().getParamTypes();
        if (!(paramTypes[0] instanceof BStructType) || ((BStructType) paramTypes[0]).structInfo.getType() != this.constraintType) {
            throw new BallerinaException("incompatible function: subscription function needs to be a function accepting an object of type:" + this.constraintType.getName());
        }
        BrokerUtils.addSubscription(this.topicName, new StreamSubscriber(String.valueOf(System.currentTimeMillis()) + UUID.randomUUID().toString(), bFunctionPointer));
    }

    public void subscribe(InputHandler inputHandler) {
        BrokerUtils.addSubscription(this.topicName, new InternalStreamSubscriber(this.topicName, String.valueOf(UUID.randomUUID()), inputHandler));
    }
}
