package org.ballerinalang.messaging.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.ballerinalang.jvm.BRuntime;
import org.ballerinalang.jvm.BallerinaValues;
import org.ballerinalang.jvm.JSONParser;
import org.ballerinalang.jvm.JSONUtils;
import org.ballerinalang.jvm.StringUtils;
import org.ballerinalang.jvm.XMLFactory;
import org.ballerinalang.jvm.observability.ObserveUtils;
import org.ballerinalang.jvm.types.AttachedFunction;
import org.ballerinalang.jvm.types.BArrayType;
import org.ballerinalang.jvm.types.BStructureType;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.util.exceptions.BallerinaConnectorException;
import org.ballerinalang.jvm.values.ErrorValue;
import org.ballerinalang.jvm.values.HandleValue;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BValueCreator;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.messaging.rabbitmq.observability.RabbitMQMetricsUtil;
import org.ballerinalang.messaging.rabbitmq.observability.RabbitMQObservabilityConstants;
import org.ballerinalang.messaging.rabbitmq.observability.RabbitMQObserverContext;

/* loaded from: input_file:org/ballerinalang/messaging/rabbitmq/MessageDispatcher.class */
public class MessageDispatcher {
    private String consumerTag;
    private static final PrintStream console = System.out;
    private Channel channel;
    private boolean autoAck;
    private ObjectValue service;
    private ObjectValue channelObj;
    private String queueName;
    private BRuntime runtime;

    public MessageDispatcher(ObjectValue objectValue, Channel channel, boolean z, BRuntime bRuntime, ObjectValue objectValue2) {
        this.channel = channel;
        this.autoAck = z;
        this.service = objectValue;
        this.queueName = getQueueNameFromConfig(objectValue);
        this.consumerTag = objectValue.getType().getName();
        this.runtime = bRuntime;
        this.channelObj = objectValue2;
    }

    private String getQueueNameFromConfig(ObjectValue objectValue) {
        return ((MapValue) objectValue.getType().getAnnotation("ballerina/rabbitmq:1.0.0", RabbitMQConstants.SERVICE_CONFIG)).getMapValue(RabbitMQConstants.ALIAS_QUEUE_CONFIG).getStringValue(RabbitMQConstants.QUEUE_NAME).getValue();
    }

    public void receiveMessages(ObjectValue objectValue) {
        console.println("[ballerina/rabbitmq] Consumer service started for queue " + this.queueName);
        try {
            this.channel.basicConsume(this.queueName, this.autoAck, this.consumerTag, new DefaultConsumer(this.channel) { // from class: org.ballerinalang.messaging.rabbitmq.MessageDispatcher.1
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                    MessageDispatcher.this.handleDispatch(bArr, envelope.getDeliveryTag(), basicProperties);
                }
            });
            ((ArrayList) objectValue.getNativeData(RabbitMQConstants.STARTED_SERVICES)).add(this.service);
            this.service.addNativeData(RabbitMQConstants.QUEUE_NAME.getValue(), this.queueName);
        } catch (IOException e) {
            RabbitMQMetricsUtil.reportError(this.channel, RabbitMQObservabilityConstants.ERROR_TYPE_CONSUME);
            throw RabbitMQUtils.returnErrorValue("Error occurred while consuming messages; " + e.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleDispatch(byte[] bArr, long j, AMQP.BasicProperties basicProperties) {
        AttachedFunction attachedFunction;
        AttachedFunction[] attachedFunctions = this.service.getType().getAttachedFunctions();
        if ("onMessage".equals(attachedFunctions[0].getName())) {
            attachedFunction = attachedFunctions[0];
        } else if (!"onMessage".equals(attachedFunctions[1].getName())) {
            return;
        } else {
            attachedFunction = attachedFunctions[1];
        }
        if (attachedFunction.getParameterType().length > 1) {
            dispatchMessageWithDataBinding(bArr, j, attachedFunction, basicProperties);
        } else {
            dispatchMessage(bArr, j, basicProperties);
        }
    }

    private void dispatchMessage(byte[] bArr, long j, AMQP.BasicProperties basicProperties) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            executeResource("onMessage", new RabbitMQResourceCallback(countDownLatch, this.channel, this.queueName, bArr.length), getMessageObjectValue(bArr, j, basicProperties), true);
            countDownLatch.await();
        } catch (AlreadyClosedException | BallerinaConnectorException e) {
            RabbitMQMetricsUtil.reportError(this.channel, RabbitMQObservabilityConstants.ERROR_TYPE_CONSUME);
            handleError(bArr, j, basicProperties);
        } catch (InterruptedException e2) {
            RabbitMQMetricsUtil.reportError(this.channel, RabbitMQObservabilityConstants.ERROR_TYPE_CONSUME);
            Thread.currentThread().interrupt();
            throw new RabbitMQConnectorException("Error occurred in RabbitMQ service. The current thread got interrupted");
        }
    }

    private void dispatchMessageWithDataBinding(byte[] bArr, long j, AttachedFunction attachedFunction, AMQP.BasicProperties basicProperties) {
        try {
            Object messageContentForType = getMessageContentForType(bArr, attachedFunction.getParameterType()[1]);
            ObjectValue messageObjectValue = getMessageObjectValue(bArr, j, basicProperties);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            executeResource("onMessage", new RabbitMQResourceCallback(countDownLatch, this.channel, this.queueName, bArr.length), messageObjectValue, true, messageContentForType, true);
            countDownLatch.await();
        } catch (InterruptedException e) {
            RabbitMQMetricsUtil.reportError(this.channel, RabbitMQObservabilityConstants.ERROR_TYPE_CONSUME);
            Thread.currentThread().interrupt();
            throw new RabbitMQConnectorException("Error occurred in RabbitMQ service. The current thread got interrupted");
        } catch (BallerinaConnectorException | UnsupportedEncodingException e2) {
            RabbitMQMetricsUtil.reportError(this.channel, RabbitMQObservabilityConstants.ERROR_TYPE_CONSUME);
            handleError(bArr, j, basicProperties);
        }
    }

    private Object getMessageContentForType(byte[] bArr, BType bType) throws UnsupportedEncodingException {
        switch (bType.getTag()) {
            case 1:
                return Integer.valueOf(Integer.parseInt(new String(bArr, StandardCharsets.UTF_8.name())));
            case 2:
            case 4:
            case 6:
            case 9:
            case RabbitMQConstants.DEFAULT_PREFETCH /* 10 */:
            case 11:
            case 13:
            case 14:
            case 15:
            case 16:
            case 17:
            case 18:
            case 19:
            default:
                RabbitMQMetricsUtil.reportError(this.channel, RabbitMQObservabilityConstants.ERROR_TYPE_CONSUME);
                throw new RabbitMQConnectorException("The content type of the message received does not match the resource signature type.");
            case 3:
                return Float.valueOf(Float.parseFloat(new String(bArr, StandardCharsets.UTF_8.name())));
            case 5:
                return StringUtils.fromString(new String(bArr, StandardCharsets.UTF_8.name()));
            case 7:
                return JSONParser.parse(new String(bArr, StandardCharsets.UTF_8.name()));
            case 8:
                return XMLFactory.parse(new String(bArr, StandardCharsets.UTF_8.name()));
            case 12:
                return JSONUtils.convertJSONToRecord(JSONParser.parse(new String(bArr, StandardCharsets.UTF_8.name())), (BStructureType) bType);
            case 20:
                if (((BArrayType) bType).getElementType().getTag() == 2) {
                    return bArr;
                }
                RabbitMQMetricsUtil.reportError(this.channel, RabbitMQObservabilityConstants.ERROR_TYPE_CONSUME);
                throw new RabbitMQConnectorException("Only type byte[] is supported in data binding.");
        }
    }

    private ObjectValue getMessageObjectValue(byte[] bArr, long j, AMQP.BasicProperties basicProperties) {
        ObjectValue createObjectValue = BallerinaValues.createObjectValue(RabbitMQConstants.PACKAGE_ID_RABBITMQ, RabbitMQConstants.MESSAGE_OBJECT, new Object[0]);
        RabbitMQTransactionContext rabbitMQTransactionContext = (RabbitMQTransactionContext) this.channelObj.getNativeData(RabbitMQConstants.RABBITMQ_TRANSACTION_CONTEXT);
        createObjectValue.set(RabbitMQConstants.DELIVERY_TAG, Long.valueOf(j));
        createObjectValue.set(RabbitMQConstants.JAVA_CLIENT_CHANNEL, new HandleValue(this.channel));
        createObjectValue.set(RabbitMQConstants.MESSAGE_CONTENT, BValueCreator.createArrayValue(bArr));
        createObjectValue.set(RabbitMQConstants.AUTO_ACK_STATUS, Boolean.valueOf(this.autoAck));
        createObjectValue.set(RabbitMQConstants.MESSAGE_ACK_STATUS, false);
        createObjectValue.addNativeData(RabbitMQConstants.RABBITMQ_TRANSACTION_CONTEXT, rabbitMQTransactionContext);
        if (basicProperties != null) {
            createObjectValue.set(RabbitMQConstants.BASIC_PROPERTIES, BallerinaValues.createRecord(BallerinaValues.createRecordValue(RabbitMQConstants.PACKAGE_ID_RABBITMQ, "BasicProperties"), new Object[]{basicProperties.getReplyTo(), basicProperties.getContentType(), basicProperties.getContentEncoding(), basicProperties.getCorrelationId()}));
        }
        return createObjectValue;
    }

    private void handleError(byte[] bArr, long j, AMQP.BasicProperties basicProperties) {
        ErrorValue returnErrorValue = RabbitMQUtils.returnErrorValue("Error occurred while dispatching the message. ");
        ObjectValue messageObjectValue = getMessageObjectValue(bArr, j, basicProperties);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            executeResource("onError", new RabbitMQErrorResourceCallback(countDownLatch), messageObjectValue, true, returnErrorValue, true);
            countDownLatch.await();
        } catch (AlreadyClosedException | BallerinaConnectorException e) {
            RabbitMQMetricsUtil.reportError(this.channel, RabbitMQObservabilityConstants.ERROR_TYPE_CONSUME);
            throw new RabbitMQConnectorException("Error occurred in RabbitMQ service. ");
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            RabbitMQMetricsUtil.reportError(this.channel, RabbitMQObservabilityConstants.ERROR_TYPE_CONSUME);
            throw new RabbitMQConnectorException("Error occurred in RabbitMQ service. The current thread got interrupted");
        }
    }

    private void executeResource(String str, CallableUnitCallback callableUnitCallback, Object... objArr) {
        if (ObserveUtils.isTracingEnabled()) {
            this.runtime.invokeMethodAsync(this.service, str, callableUnitCallback, getNewObserverContextInProperties(), objArr);
        } else {
            this.runtime.invokeMethodAsync(this.service, str, callableUnitCallback, objArr);
        }
    }

    private Map<String, Object> getNewObserverContextInProperties() {
        HashMap hashMap = new HashMap();
        RabbitMQObserverContext rabbitMQObserverContext = new RabbitMQObserverContext(this.channel);
        rabbitMQObserverContext.addTag(RabbitMQObservabilityConstants.TAG_QUEUE, this.queueName);
        hashMap.put("__observer_context__", rabbitMQObserverContext);
        return hashMap;
    }
}
