/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.nats.basic.consumer;

import io.nats.client.Message;
import io.nats.client.MessageHandler;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import org.ballerinalang.jvm.BRuntime;
import org.ballerinalang.jvm.BallerinaValues;
import org.ballerinalang.jvm.StringUtils;
import org.ballerinalang.jvm.observability.ObserveUtils;
import org.ballerinalang.jvm.services.ErrorHandlerUtils;
import org.ballerinalang.jvm.types.AttachedFunction;
import org.ballerinalang.jvm.types.BPackage;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.values.ArrayValueImpl;
import org.ballerinalang.jvm.values.ErrorValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.nats.Constants;
import org.ballerinalang.nats.Utils;
import org.ballerinalang.nats.basic.consumer.ErrorHandler;
import org.ballerinalang.nats.observability.NatsMetricsReporter;
import org.ballerinalang.nats.observability.NatsObserverContext;

public class DefaultMessageHandler
implements MessageHandler {
    private ObjectValue serviceObject;
    private String connectedUrl;
    private BRuntime runtime;
    private NatsMetricsReporter natsMetricsReporter;

    DefaultMessageHandler(ObjectValue serviceObject, BRuntime runtime, String connectedUrl, NatsMetricsReporter natsMetricsReporter) {
        this.serviceObject = serviceObject;
        this.runtime = runtime;
        this.connectedUrl = connectedUrl;
        this.natsMetricsReporter = natsMetricsReporter;
    }

    public void onMessage(Message message) {
        this.natsMetricsReporter.reportConsume(message.getSubject(), message.getData().length);
        ArrayValueImpl msgData = new ArrayValueImpl(message.getData());
        ObjectValue msgObj = BallerinaValues.createObjectValue((BPackage)Constants.NATS_PACKAGE_ID, (String)"Message", (Object[])new Object[]{StringUtils.fromString((String)message.getSubject()), msgData, StringUtils.fromString((String)message.getReplyTo())});
        AttachedFunction onMessage = Utils.getAttachedFunction(this.serviceObject, "onMessage");
        BType[] parameterTypes = onMessage.getParameterType();
        if (parameterTypes.length == 1) {
            this.dispatch(msgObj);
        } else {
            BType intendedTypeForData = parameterTypes[1];
            this.dispatchWithDataBinding(msgObj, intendedTypeForData, message.getData());
        }
    }

    private void dispatch(ObjectValue msgObj) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.executeResource(msgObj, countDownLatch);
        try {
            countDownLatch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.natsMetricsReporter.reportConsumerError(msgObj.getStringValue(Constants.SUBJECT).getValue(), "message_received");
            throw Utils.createNatsError("Internal error occurred. The current thread got interrupted.");
        }
    }

    private void dispatchWithDataBinding(ObjectValue msgObj, BType intendedType, byte[] data) {
        try {
            Object typeBoundData = Utils.bindDataToIntendedType(data, intendedType);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            this.executeResource(msgObj, countDownLatch, typeBoundData);
            countDownLatch.await();
        }
        catch (NumberFormatException e) {
            ErrorValue dataBindError = Utils.createNatsError("The received message is unsupported by the resource signature");
            ErrorHandler.dispatchError(this.serviceObject, msgObj, dataBindError, this.runtime, this.natsMetricsReporter);
        }
        catch (ErrorValue e) {
            ErrorHandler.dispatchError(this.serviceObject, msgObj, e, this.runtime, this.natsMetricsReporter);
        }
        catch (InterruptedException e) {
            this.natsMetricsReporter.reportConsumerError(msgObj.getStringValue(Constants.SUBJECT).getValue(), "message_received");
            Thread.currentThread().interrupt();
            throw Utils.createNatsError("Internal error occurred. The current thread got interrupted.");
        }
    }

    private void executeResource(ObjectValue msgObj, CountDownLatch countDownLatch) {
        String subject = msgObj.getStringValue(Constants.SUBJECT).getValue();
        if (ObserveUtils.isTracingEnabled()) {
            HashMap<String, NatsObserverContext> properties = new HashMap<String, NatsObserverContext>();
            NatsObserverContext observerContext = new NatsObserverContext("consumer", this.connectedUrl, msgObj.getStringValue(Constants.SUBJECT).getValue());
            properties.put("__observer_context__", observerContext);
            this.runtime.invokeMethodAsync(this.serviceObject, "onMessage", null, Constants.ON_MESSAGE_METADATA, (CallableUnitCallback)new ResponseCallback(countDownLatch, subject, this.natsMetricsReporter), properties, new Object[]{msgObj, Boolean.TRUE});
        } else {
            this.runtime.invokeMethodAsync(this.serviceObject, "onMessage", null, Constants.ON_MESSAGE_METADATA, (CallableUnitCallback)new ResponseCallback(countDownLatch, subject, this.natsMetricsReporter), null, new Object[]{msgObj, Boolean.TRUE});
        }
    }

    private void executeResource(ObjectValue msgObj, CountDownLatch countDownLatch, Object typeBoundData) {
        String subject = msgObj.getStringValue(Constants.SUBJECT).getValue();
        if (ObserveUtils.isTracingEnabled()) {
            HashMap<String, NatsObserverContext> properties = new HashMap<String, NatsObserverContext>();
            NatsObserverContext observerContext = new NatsObserverContext("consumer", this.connectedUrl, msgObj.getStringValue(Constants.SUBJECT).getValue());
            properties.put("__observer_context__", observerContext);
            this.runtime.invokeMethodAsync(this.serviceObject, "onMessage", null, Constants.ON_MESSAGE_METADATA, (CallableUnitCallback)new ResponseCallback(countDownLatch, subject, this.natsMetricsReporter), properties, new Object[]{msgObj, true, typeBoundData, true});
        } else {
            this.runtime.invokeMethodAsync(this.serviceObject, "onMessage", null, Constants.ON_MESSAGE_METADATA, (CallableUnitCallback)new ResponseCallback(countDownLatch, subject, this.natsMetricsReporter), null, new Object[]{msgObj, true, typeBoundData, true});
        }
    }

    public static class ResponseCallback
    implements CallableUnitCallback {
        private CountDownLatch countDownLatch;
        private String subject;
        private NatsMetricsReporter natsMetricsReporter;

        ResponseCallback(CountDownLatch countDownLatch, String subject, NatsMetricsReporter natsMetricsReporter) {
            this.countDownLatch = countDownLatch;
            this.subject = subject;
            this.natsMetricsReporter = natsMetricsReporter;
        }

        public void notifySuccess() {
            this.natsMetricsReporter.reportDelivery(this.subject);
            this.countDownLatch.countDown();
        }

        public void notifyFailure(ErrorValue error) {
            ErrorHandlerUtils.printError((Throwable)error);
            this.natsMetricsReporter.reportConsumerError(this.subject, "message_received");
            this.countDownLatch.countDown();
        }
    }
}

