/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.nats.basic.consumer;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.MessageHandler;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.ballerinalang.jvm.BRuntime;
import org.ballerinalang.jvm.BallerinaErrors;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.nats.Utils;
import org.ballerinalang.nats.basic.consumer.DefaultMessageHandler;
import org.ballerinalang.nats.observability.NatsMetricsUtil;

public class Register {
    private static final PrintStream console = System.out;

    public static Object basicRegister(ObjectValue listenerObject, ObjectValue service, Object annotationData) {
        String errorMessage = "Error while registering the subscriber. ";
        Connection natsConnection = (Connection)((ObjectValue)listenerObject.get("conn")).getNativeData("nats_connection");
        List serviceList = (List)((ObjectValue)listenerObject.get("conn")).getNativeData("service_list");
        MapValue<String, Object> subscriptionConfig = Utils.getSubscriptionConfig(service.getType().getAnnotation("ballerina/nats", "SubscriptionConfig"));
        if (subscriptionConfig == null) {
            NatsMetricsUtil.reportConsumerError("subscription");
            return BallerinaErrors.createError((String)"{ballerina/nats}Error", (String)(errorMessage + " Cannot find subscription configuration."));
        }
        String queueName = subscriptionConfig.getStringValue("queueName");
        String subject = subscriptionConfig.getStringValue("subject");
        BRuntime runtime = BRuntime.getCurrentRuntime();
        ObjectValue connectionObject = (ObjectValue)listenerObject.get("conn");
        NatsMetricsUtil natsMetricsUtil = (NatsMetricsUtil)connectionObject.getNativeData("nats_metric_util");
        Dispatcher dispatcher = natsConnection.createDispatcher((MessageHandler)new DefaultMessageHandler(service, runtime, natsConnection.getConnectedUrl(), natsMetricsUtil));
        ConcurrentHashMap dispatcherList = (ConcurrentHashMap)listenerObject.getNativeData("dispatcher_list");
        dispatcherList.put(service.getType().getName(), dispatcher);
        if (subscriptionConfig.getMapValue("pendingLimits") != null) {
            Register.setPendingLimits(dispatcher, subscriptionConfig.getMapValue("pendingLimits"));
        }
        try {
            if (queueName != null) {
                dispatcher.subscribe(subject, queueName);
            } else {
                dispatcher.subscribe(subject);
            }
        }
        catch (IllegalArgumentException | IllegalStateException ex) {
            natsMetricsUtil.reportConsumerError(subject, "subscription");
            return BallerinaErrors.createError((String)"{ballerina/nats}Error", (String)(errorMessage + ex.getMessage()));
        }
        serviceList.add(service);
        String consoleOutput = "subject " + subject + (queueName != null ? " & queue " + queueName : "");
        console.println("[ballerina/nats] Client subscribed for " + consoleOutput);
        ArrayList subscriptionsList = (ArrayList)listenerObject.getNativeData("BasicSubscriptionList");
        subscriptionsList.add(subject);
        NatsMetricsUtil.reportSubscription(natsConnection.getConnectedUrl(), subject);
        return null;
    }

    private static void setPendingLimits(Dispatcher dispatcher, MapValue pendingLimits) {
        long maxMessages = pendingLimits.getIntValue("maxMessages");
        long maxBytes = pendingLimits.getIntValue("maxBytes");
        dispatcher.setPendingLimits(maxMessages, maxBytes);
    }
}

