package org.ballerinalang.nats.basic.consumer;

import io.nats.client.Message;
import io.nats.client.MessageHandler;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.ballerinalang.jvm.BRuntime;
import org.ballerinalang.jvm.BallerinaValues;
import org.ballerinalang.jvm.StringUtils;
import org.ballerinalang.jvm.observability.ObserveUtils;
import org.ballerinalang.jvm.services.ErrorHandlerUtils;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.values.ArrayValueImpl;
import org.ballerinalang.jvm.values.ErrorValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.nats.Constants;
import org.ballerinalang.nats.Utils;
import org.ballerinalang.nats.observability.NatsMetricsReporter;
import org.ballerinalang.nats.observability.NatsObservabilityConstants;
import org.ballerinalang.nats.observability.NatsObserverContext;

/* loaded from: input_file:org/ballerinalang/nats/basic/consumer/DefaultMessageHandler.class */
public class DefaultMessageHandler implements MessageHandler {
    private ObjectValue serviceObject;
    private String connectedUrl;
    private BRuntime runtime;
    private NatsMetricsReporter natsMetricsReporter;

    /* loaded from: input_file:org/ballerinalang/nats/basic/consumer/DefaultMessageHandler$ResponseCallback.class */
    public static class ResponseCallback implements CallableUnitCallback {
        private CountDownLatch countDownLatch;
        private String subject;
        private NatsMetricsReporter natsMetricsReporter;

        ResponseCallback(CountDownLatch countDownLatch, String str, NatsMetricsReporter natsMetricsReporter) {
            this.countDownLatch = countDownLatch;
            this.subject = str;
            this.natsMetricsReporter = natsMetricsReporter;
        }

        public void notifySuccess() {
            this.natsMetricsReporter.reportDelivery(this.subject);
            this.countDownLatch.countDown();
        }

        public void notifyFailure(ErrorValue errorValue) {
            ErrorHandlerUtils.printError(errorValue);
            this.natsMetricsReporter.reportConsumerError(this.subject, NatsObservabilityConstants.ERROR_TYPE_MSG_RECEIVED);
            this.countDownLatch.countDown();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultMessageHandler(ObjectValue objectValue, BRuntime bRuntime, String str, NatsMetricsReporter natsMetricsReporter) {
        this.serviceObject = objectValue;
        this.runtime = bRuntime;
        this.connectedUrl = str;
        this.natsMetricsReporter = natsMetricsReporter;
    }

    public void onMessage(Message message) {
        this.natsMetricsReporter.reportConsume(message.getSubject(), message.getData().length);
        ObjectValue createObjectValue = BallerinaValues.createObjectValue(Constants.NATS_PACKAGE_ID, Constants.NATS_MESSAGE_OBJ_NAME, new Object[]{StringUtils.fromString(message.getSubject()), new ArrayValueImpl(message.getData()), StringUtils.fromString(message.getReplyTo())});
        BType[] parameterType = Utils.getAttachedFunction(this.serviceObject, Constants.ON_MESSAGE_RESOURCE).getParameterType();
        if (parameterType.length == 1) {
            dispatch(createObjectValue);
        } else {
            dispatchWithDataBinding(createObjectValue, parameterType[1], message.getData());
        }
    }

    private void dispatch(ObjectValue objectValue) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        executeResource(objectValue, countDownLatch);
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.natsMetricsReporter.reportConsumerError(objectValue.getStringValue(Constants.SUBJECT).getValue(), NatsObservabilityConstants.ERROR_TYPE_MSG_RECEIVED);
            throw Utils.createNatsError(Constants.THREAD_INTERRUPTED_ERROR);
        }
    }

    private void dispatchWithDataBinding(ObjectValue objectValue, BType bType, byte[] bArr) {
        try {
            Object bindDataToIntendedType = Utils.bindDataToIntendedType(bArr, bType);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            executeResource(objectValue, countDownLatch, bindDataToIntendedType);
            countDownLatch.await();
        } catch (InterruptedException e) {
            this.natsMetricsReporter.reportConsumerError(objectValue.getStringValue(Constants.SUBJECT).getValue(), NatsObservabilityConstants.ERROR_TYPE_MSG_RECEIVED);
            Thread.currentThread().interrupt();
            throw Utils.createNatsError(Constants.THREAD_INTERRUPTED_ERROR);
        } catch (ErrorValue e2) {
            ErrorHandler.dispatchError(this.serviceObject, objectValue, e2, this.runtime, this.natsMetricsReporter);
        } catch (NumberFormatException e3) {
            ErrorHandler.dispatchError(this.serviceObject, objectValue, Utils.createNatsError("The received message is unsupported by the resource signature"), this.runtime, this.natsMetricsReporter);
        }
    }

    private void executeResource(ObjectValue objectValue, CountDownLatch countDownLatch) {
        String value = objectValue.getStringValue(Constants.SUBJECT).getValue();
        if (!ObserveUtils.isTracingEnabled()) {
            this.runtime.invokeMethodAsync(this.serviceObject, Constants.ON_MESSAGE_RESOURCE, (String) null, Constants.ON_MESSAGE_METADATA, new ResponseCallback(countDownLatch, value, this.natsMetricsReporter), (Map) null, new Object[]{objectValue, Boolean.TRUE});
            return;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("__observer_context__", new NatsObserverContext(NatsObservabilityConstants.CONTEXT_CONSUMER, this.connectedUrl, objectValue.getStringValue(Constants.SUBJECT).getValue()));
        this.runtime.invokeMethodAsync(this.serviceObject, Constants.ON_MESSAGE_RESOURCE, (String) null, Constants.ON_MESSAGE_METADATA, new ResponseCallback(countDownLatch, value, this.natsMetricsReporter), hashMap, new Object[]{objectValue, Boolean.TRUE});
    }

    private void executeResource(ObjectValue objectValue, CountDownLatch countDownLatch, Object obj) {
        String value = objectValue.getStringValue(Constants.SUBJECT).getValue();
        if (!ObserveUtils.isTracingEnabled()) {
            this.runtime.invokeMethodAsync(this.serviceObject, Constants.ON_MESSAGE_RESOURCE, (String) null, Constants.ON_MESSAGE_METADATA, new ResponseCallback(countDownLatch, value, this.natsMetricsReporter), (Map) null, new Object[]{objectValue, true, obj, true});
            return;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("__observer_context__", new NatsObserverContext(NatsObservabilityConstants.CONTEXT_CONSUMER, this.connectedUrl, objectValue.getStringValue(Constants.SUBJECT).getValue()));
        this.runtime.invokeMethodAsync(this.serviceObject, Constants.ON_MESSAGE_RESOURCE, (String) null, Constants.ON_MESSAGE_METADATA, new ResponseCallback(countDownLatch, value, this.natsMetricsReporter), hashMap, new Object[]{objectValue, true, obj, true});
    }
}
