/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.nats.streaming.consumer;

import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import org.ballerinalang.jvm.BallerinaValues;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.services.ErrorHandlerUtils;
import org.ballerinalang.jvm.types.AttachedFunction;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.values.ArrayValue;
import org.ballerinalang.jvm.values.ErrorValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.jvm.values.connector.Executor;
import org.ballerinalang.nats.Utils;

public class StreamingListener
implements MessageHandler {
    private ObjectValue service;
    private Scheduler scheduler;

    public StreamingListener(ObjectValue service, Scheduler threadScheduler) {
        this.service = service;
        this.scheduler = threadScheduler;
    }

    public void onMessage(Message msg) {
        ObjectValue ballerinaNatsMessage = BallerinaValues.createObjectValue((String)"ballerina/nats", (String)"StreamingMessage", (Object[])new Object[]{msg.getSubject(), new ArrayValue(msg.getData()), msg.getReplyTo()});
        ballerinaNatsMessage.addNativeData("nats_streaming_message", (Object)msg);
        AttachedFunction onMessageResource = Utils.getAttachedFunction(this.service, "onMessage");
        BType[] parameterTypes = onMessageResource.getParameterType();
        if (parameterTypes.length == 1) {
            this.dispatch(ballerinaNatsMessage);
        } else {
            BType intendedTypeForData = parameterTypes[1];
            this.dispatch(ballerinaNatsMessage, intendedTypeForData, msg.getData());
        }
    }

    private void dispatch(ObjectValue ballerinaNatsMessage) {
        Executor.submit((Scheduler)this.scheduler, (ObjectValue)this.service, (String)"onMessage", (CallableUnitCallback)new DispatcherCallback(), null, (Object[])new Object[]{ballerinaNatsMessage, true});
    }

    private void dispatch(ObjectValue ballerinaNatsMessage, BType intendedTypeForData, byte[] data) {
        try {
            Object typeBoundData = Utils.bindDataToIntendedType(data, intendedTypeForData);
            Executor.submit((Scheduler)this.scheduler, (ObjectValue)this.service, (String)"onMessage", (CallableUnitCallback)new DispatcherCallback(), null, (Object[])new Object[]{ballerinaNatsMessage, true, typeBoundData, true});
        }
        catch (NumberFormatException e) {
            ErrorValue dataBindError = Utils.createNatsError("The received message is unsupported by the resource signature");
            Executor.submit((Scheduler)this.scheduler, (ObjectValue)this.service, (String)"onError", (CallableUnitCallback)new DispatcherCallback(), null, (Object[])new Object[]{ballerinaNatsMessage, true, dataBindError, true});
        }
        catch (ErrorValue e) {
            Executor.submit((Scheduler)this.scheduler, (ObjectValue)this.service, (String)"onError", (CallableUnitCallback)new DispatcherCallback(), null, (Object[])new Object[]{ballerinaNatsMessage, true, e, true});
        }
    }

    private static class DispatcherCallback
    implements CallableUnitCallback {
        private DispatcherCallback() {
        }

        public void notifySuccess() {
        }

        public void notifyFailure(ErrorValue error) {
            ErrorHandlerUtils.printError((Throwable)error);
        }
    }
}

