package org.ballerinalang.nats.streaming.consumer;

import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import java.util.HashMap;
import java.util.Map;
import org.ballerinalang.jvm.BRuntime;
import org.ballerinalang.jvm.BallerinaValues;
import org.ballerinalang.jvm.StringUtils;
import org.ballerinalang.jvm.observability.ObserveUtils;
import org.ballerinalang.jvm.services.ErrorHandlerUtils;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.values.ErrorValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BValueCreator;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.nats.Constants;
import org.ballerinalang.nats.Utils;
import org.ballerinalang.nats.observability.NatsMetricsReporter;
import org.ballerinalang.nats.observability.NatsObserverContext;

/* loaded from: input_file:org/ballerinalang/nats/streaming/consumer/StreamingListener.class */
public class StreamingListener implements MessageHandler {
    private ObjectValue service;
    private BRuntime runtime;
    private String connectedUrl;
    private boolean manualAck;
    private NatsMetricsReporter natsMetricsReporter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/ballerinalang/nats/streaming/consumer/StreamingListener$DispatcherCallback.class */
    public static class DispatcherCallback implements CallableUnitCallback {
        private String subject;
        private NatsMetricsReporter natsMetricsReporter;

        public DispatcherCallback(String str, NatsMetricsReporter natsMetricsReporter) {
            this.subject = str;
            this.natsMetricsReporter = natsMetricsReporter;
        }

        public void notifySuccess() {
            this.natsMetricsReporter.reportDelivery(this.subject);
        }

        public void notifyFailure(ErrorValue errorValue) {
            this.natsMetricsReporter.reportConsumerError(this.subject, "message_received");
            ErrorHandlerUtils.printError(errorValue);
        }
    }

    public StreamingListener(ObjectValue objectValue, boolean z, BRuntime bRuntime, String str, NatsMetricsReporter natsMetricsReporter) {
        this.service = objectValue;
        this.runtime = bRuntime;
        this.manualAck = z;
        this.natsMetricsReporter = natsMetricsReporter;
        this.connectedUrl = str;
    }

    @Override // io.nats.streaming.MessageHandler
    public void onMessage(Message message) {
        this.natsMetricsReporter.reportConsume(message.getSubject(), message.getData().length);
        ObjectValue createObjectValue = BallerinaValues.createObjectValue(Constants.NATS_PACKAGE_ID, Constants.NATS_STREAMING_MESSAGE_OBJ_NAME, new Object[]{StringUtils.fromString(message.getSubject()), BValueCreator.createArrayValue(message.getData()), StringUtils.fromString(message.getReplyTo())});
        createObjectValue.addNativeData(Constants.NATS_STREAMING_MSG, message);
        createObjectValue.addNativeData(Constants.NATS_STREAMING_MANUAL_ACK.getValue(), Boolean.valueOf(this.manualAck));
        BType[] parameterType = Utils.getAttachedFunction(this.service, "onMessage").getParameterType();
        if (parameterType.length == 1) {
            dispatch(createObjectValue, message.getSubject());
        } else {
            dispatch(createObjectValue, parameterType[1], message.getData(), message.getSubject());
        }
    }

    private void dispatch(ObjectValue objectValue, String str) {
        executeResource(str, objectValue);
    }

    private void dispatch(ObjectValue objectValue, BType bType, byte[] bArr, String str) {
        try {
            executeResource(str, objectValue, Utils.bindDataToIntendedType(bArr, bType));
        } catch (NumberFormatException e) {
            ErrorValue createNatsError = Utils.createNatsError("The received message is unsupported by the resource signature");
            this.natsMetricsReporter.reportConsumerError(str, "message_received");
            executeErrorResource(str, objectValue, createNatsError);
        } catch (ErrorValue e2) {
            executeErrorResource(str, objectValue, e2);
            this.natsMetricsReporter.reportConsumerError(str, "message_received");
        }
    }

    private void executeResource(String str, ObjectValue objectValue) {
        if (!ObserveUtils.isTracingEnabled()) {
            this.runtime.invokeMethodAsync(this.service, "onMessage", new DispatcherCallback(str, this.natsMetricsReporter), (Map) null, new Object[]{objectValue, true});
            return;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("__observer_context__", new NatsObserverContext("consumer", this.connectedUrl, str));
        this.runtime.invokeMethodAsync(this.service, "onMessage", new DispatcherCallback(str, this.natsMetricsReporter), hashMap, new Object[]{objectValue, true});
    }

    private void executeResource(String str, ObjectValue objectValue, Object obj) {
        if (!ObserveUtils.isTracingEnabled()) {
            this.runtime.invokeMethodAsync(this.service, "onMessage", new DispatcherCallback(str, this.natsMetricsReporter), (Map) null, new Object[]{objectValue, true, obj, true});
            return;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("__observer_context__", new NatsObserverContext("consumer", this.connectedUrl, str));
        this.runtime.invokeMethodAsync(this.service, "onMessage", new DispatcherCallback(str, this.natsMetricsReporter), hashMap, new Object[]{objectValue, true, obj, true});
    }

    private void executeErrorResource(String str, ObjectValue objectValue, ErrorValue errorValue) {
        if (!ObserveUtils.isTracingEnabled()) {
            this.runtime.invokeMethodAsync(this.service, "onError", new DispatcherCallback(str, this.natsMetricsReporter), (Map) null, new Object[]{objectValue, true, errorValue, true});
            return;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("__observer_context__", new NatsObserverContext("consumer", this.connectedUrl, str));
        this.runtime.invokeMethodAsync(this.service, "onError", new DispatcherCallback(str, this.natsMetricsReporter), hashMap, new Object[]{objectValue, true, errorValue, true});
    }
}
