package org.ballerinalang.nats.streaming.consumer;

import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import java.util.Map;
import org.ballerinalang.jvm.BallerinaValues;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.services.ErrorHandlerUtils;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.values.ArrayValueImpl;
import org.ballerinalang.jvm.values.ErrorValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.jvm.values.connector.Executor;
import org.ballerinalang.nats.Constants;
import org.ballerinalang.nats.Utils;

/* loaded from: input_file:org/ballerinalang/nats/streaming/consumer/StreamingListener.class */
public class StreamingListener implements MessageHandler {
    private ObjectValue service;
    private Scheduler scheduler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/ballerinalang/nats/streaming/consumer/StreamingListener$DispatcherCallback.class */
    public static class DispatcherCallback implements CallableUnitCallback {
        private DispatcherCallback() {
        }

        public void notifySuccess() {
        }

        public void notifyFailure(ErrorValue errorValue) {
            ErrorHandlerUtils.printError(errorValue);
        }
    }

    public StreamingListener(ObjectValue objectValue, Scheduler scheduler) {
        this.service = objectValue;
        this.scheduler = scheduler;
    }

    public void onMessage(Message message) {
        ObjectValue createObjectValue = BallerinaValues.createObjectValue(Constants.NATS_PACKAGE_ID, Constants.NATS_STREAMING_MESSAGE_OBJ_NAME, new Object[]{message.getSubject(), new ArrayValueImpl(message.getData()), message.getReplyTo()});
        createObjectValue.addNativeData(Constants.NATS_STREAMING_MSG, message);
        BType[] parameterType = Utils.getAttachedFunction(this.service, Constants.ON_MESSAGE_RESOURCE).getParameterType();
        if (parameterType.length == 1) {
            dispatch(createObjectValue);
        } else {
            dispatch(createObjectValue, parameterType[1], message.getData());
        }
    }

    private void dispatch(ObjectValue objectValue) {
        Executor.submit(this.scheduler, this.service, Constants.ON_MESSAGE_RESOURCE, new DispatcherCallback(), (Map) null, new Object[]{objectValue, true});
    }

    private void dispatch(ObjectValue objectValue, BType bType, byte[] bArr) {
        try {
            Executor.submit(this.scheduler, this.service, Constants.ON_MESSAGE_RESOURCE, new DispatcherCallback(), (Map) null, new Object[]{objectValue, true, Utils.bindDataToIntendedType(bArr, bType), true});
        } catch (ErrorValue e) {
            Executor.submit(this.scheduler, this.service, Constants.ON_ERROR_RESOURCE, new DispatcherCallback(), (Map) null, new Object[]{objectValue, true, e, true});
        } catch (NumberFormatException e2) {
            Executor.submit(this.scheduler, this.service, Constants.ON_ERROR_RESOURCE, new DispatcherCallback(), (Map) null, new Object[]{objectValue, true, Utils.createNatsError("The received message is unsupported by the resource signature"), true});
        }
    }
}
