package org.ballerinalang.model.values;

import io.ballerina.messaging.broker.core.BrokerException;
import io.ballerina.messaging.broker.core.Consumer;
import io.ballerina.messaging.broker.core.Message;
import io.ballerina.messaging.broker.core.util.TraceField;
import java.util.UUID;
import org.ballerinalang.broker.BallerinaBrokerByteBuf;
import org.ballerinalang.broker.BrokerUtils;
import org.ballerinalang.model.types.BAnyType;
import org.ballerinalang.model.types.BIndexedType;
import org.ballerinalang.model.types.BStreamType;
import org.ballerinalang.model.types.BStructureType;
import org.ballerinalang.model.types.BType;
import org.ballerinalang.model.types.BTypes;
import org.ballerinalang.model.types.BUnionType;
import org.ballerinalang.siddhi.core.stream.input.InputHandler;
import org.ballerinalang.util.exceptions.BallerinaException;
import org.ballerinalang.util.program.BLangFunctions;

/* loaded from: input_file:org/ballerinalang/model/values/BStream.class */
public class BStream implements BRefType<Object> {
    private static final String TOPIC_NAME_PREFIX = "TOPIC_NAME_";
    private BType constraintType;
    private String streamId;
    private String topicName;

    /* loaded from: input_file:org/ballerinalang/model/values/BStream$InternalStreamSubscriber.class */
    private class InternalStreamSubscriber extends Consumer {
        private final String topic;
        private final String queueName;
        private final InputHandler inputHandler;

        InternalStreamSubscriber(String str, String str2, InputHandler inputHandler) {
            this.topic = str;
            this.queueName = str2;
            this.inputHandler = inputHandler;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.ballerina.messaging.broker.core.Consumer
        public void send(Message message) throws BrokerException {
            try {
                this.inputHandler.send(createEvent((BStruct) ((BallerinaBrokerByteBuf) message.getContentChunks().get(0).getByteBuf().unwrap()).getValue()));
            } catch (InterruptedException e) {
                throw new BallerinaException("Error while sending events to stream: " + this.topic + TraceField.DELIMITER + e.getMessage(), e);
            }
        }

        private Object[] createEvent(BStruct bStruct) {
            BStructureType type = bStruct.getType();
            int i = -1;
            int i2 = -1;
            int i3 = -1;
            int i4 = -1;
            Object[] objArr = new Object[type.getFields().length];
            for (int i5 = 0; i5 < type.getFields().length; i5++) {
                switch (type.getFields()[i5].getFieldType().getTag()) {
                    case 1:
                        i++;
                        objArr[i5] = Long.valueOf(bStruct.getIntField(i));
                        break;
                    case 2:
                        i2++;
                        objArr[i5] = Double.valueOf(bStruct.getFloatField(i2));
                        break;
                    case 3:
                        i3++;
                        objArr[i5] = bStruct.getStringField(i3);
                        break;
                    case 4:
                        i4++;
                        objArr[i5] = Boolean.valueOf(bStruct.getBooleanField(i4) == 1);
                        break;
                    default:
                        throw new BallerinaException("Fields in streams do not support data types other than int, float, boolean and string");
                }
            }
            return objArr;
        }

        @Override // io.ballerina.messaging.broker.core.Consumer
        public String getQueueName() {
            return this.queueName;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.ballerina.messaging.broker.core.Consumer
        public void close() throws BrokerException {
        }

        @Override // io.ballerina.messaging.broker.core.Consumer
        public boolean isExclusive() {
            return false;
        }

        @Override // io.ballerina.messaging.broker.core.Consumer
        public boolean isReady() {
            return true;
        }
    }

    /* loaded from: input_file:org/ballerinalang/model/values/BStream$StreamSubscriber.class */
    private class StreamSubscriber extends Consumer {
        final String queueName;
        final BFunctionPointer functionPointer;

        StreamSubscriber(String str, BFunctionPointer bFunctionPointer) {
            this.queueName = str;
            this.functionPointer = bFunctionPointer;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.ballerina.messaging.broker.core.Consumer
        public void send(Message message) throws BrokerException {
            BLangFunctions.invokeCallable(this.functionPointer.value().getFunctionInfo(), new BValue[]{((BallerinaBrokerByteBuf) message.getContentChunks().get(0).getByteBuf().unwrap()).getValue()});
        }

        @Override // io.ballerina.messaging.broker.core.Consumer
        public String getQueueName() {
            return this.queueName;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.ballerina.messaging.broker.core.Consumer
        public void close() throws BrokerException {
        }

        @Override // io.ballerina.messaging.broker.core.Consumer
        public boolean isExclusive() {
            return false;
        }

        @Override // io.ballerina.messaging.broker.core.Consumer
        public boolean isReady() {
            return true;
        }
    }

    public BStream(BType bType, String str) {
        this.streamId = "";
        if (((BStreamType) bType).getConstrainedType() == null) {
            throw new BallerinaException("a stream cannot be declared without a constraint");
        }
        this.constraintType = ((BStreamType) bType).getConstrainedType();
        if (this.constraintType.getName() != null) {
            this.topicName = TOPIC_NAME_PREFIX + this.constraintType.getName().toUpperCase() + "_" + str;
        } else if (this.constraintType instanceof BIndexedType) {
            this.topicName = TOPIC_NAME_PREFIX + ((BIndexedType) this.constraintType).getElementType() + "_" + str;
        } else {
            this.topicName = TOPIC_NAME_PREFIX + str;
        }
        this.topicName = this.topicName.concat("_").concat(UUID.randomUUID().toString());
        this.streamId = str;
    }

    public String getStreamId() {
        return this.streamId;
    }

    @Override // org.ballerinalang.model.values.BValue
    public String stringValue() {
        return "";
    }

    @Override // org.ballerinalang.model.values.BValue
    public BType getType() {
        return BTypes.typeStream;
    }

    @Override // org.ballerinalang.model.values.BValue
    public BValue copy() {
        return null;
    }

    @Override // org.ballerinalang.model.values.BRefType
    public Object value() {
        return null;
    }

    public BType getConstraintType() {
        return this.constraintType;
    }

    public void publish(BValue bValue) {
        BType type = bValue.getType();
        if (!type.equals(this.constraintType) && ((!(this.constraintType instanceof BUnionType) || !((BUnionType) this.constraintType).getMemberTypes().contains(type)) && !(this.constraintType instanceof BAnyType))) {
            throw new BallerinaException("incompatible types: value of type:" + type.getName() + " cannot be added to a stream of type:" + this.constraintType.getName());
        }
        BrokerUtils.publish(this.topicName, new BallerinaBrokerByteBuf(bValue));
    }

    public void subscribe(BFunctionPointer bFunctionPointer) {
        BType[] paramTypes = bFunctionPointer.funcRefCPEntry.getFunctionInfo().getParamTypes();
        if (paramTypes[0].getTag() != this.constraintType.getTag() || ((this.constraintType instanceof BStructureType) && ((BStructureType) paramTypes[0]).getTypeInfo().getType() != this.constraintType)) {
            throw new BallerinaException("incompatible function: subscription function needs to be a function accepting:" + this.constraintType.getName());
        }
        BrokerUtils.addSubscription(this.topicName, new StreamSubscriber(String.valueOf(System.currentTimeMillis()) + UUID.randomUUID().toString(), bFunctionPointer));
    }

    public void subscribe(InputHandler inputHandler) {
        if (this.constraintType.getTag() != 32 && this.constraintType.getTag() != 33) {
            throw new BallerinaException("Streaming Support is only available with streams accepting objects");
        }
        BrokerUtils.addSubscription(this.topicName, new InternalStreamSubscriber(this.topicName, String.valueOf(UUID.randomUUID()), inputHandler));
    }
}
