/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.nats.basic.consumer;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.MessageHandler;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.ballerinalang.jvm.BRuntime;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BString;
import org.ballerinalang.nats.Constants;
import org.ballerinalang.nats.Utils;
import org.ballerinalang.nats.basic.consumer.DefaultMessageHandler;
import org.ballerinalang.nats.observability.NatsMetricsReporter;

public class Register {
    private static final PrintStream console = System.out;

    public static Object basicRegister(ObjectValue listenerObject, ObjectValue service, Object annotationData) {
        String errorMessage = "Error while registering the subscriber. ";
        Connection natsConnection = (Connection)((ObjectValue)listenerObject.get(Constants.CONNECTION_OBJ)).getNativeData("nats_connection");
        List serviceList = (List)((ObjectValue)listenerObject.get(Constants.CONNECTION_OBJ)).getNativeData("service_list");
        MapValue<BString, Object> subscriptionConfig = Utils.getSubscriptionConfig(service.getType().getAnnotation("ballerina/nats:1.0.0", "SubscriptionConfig"));
        if (subscriptionConfig == null) {
            NatsMetricsReporter.reportConsumerError("subscription");
            return Utils.createNatsError(errorMessage + " Cannot find subscription configuration.");
        }
        String queueName = null;
        if (subscriptionConfig.containsKey((Object)Constants.QUEUE_NAME)) {
            queueName = subscriptionConfig.getStringValue(Constants.QUEUE_NAME).getValue();
        }
        String subject = subscriptionConfig.getStringValue(Constants.SUBJECT).getValue();
        BRuntime runtime = BRuntime.getCurrentRuntime();
        ObjectValue connectionObject = (ObjectValue)listenerObject.get(Constants.CONNECTION_OBJ);
        NatsMetricsReporter natsMetricsReporter = (NatsMetricsReporter)connectionObject.getNativeData("nats_metric_util");
        Dispatcher dispatcher = natsConnection.createDispatcher((MessageHandler)new DefaultMessageHandler(service, runtime, natsConnection.getConnectedUrl(), natsMetricsReporter));
        ConcurrentHashMap dispatcherList = (ConcurrentHashMap)listenerObject.getNativeData("dispatcher_list");
        dispatcherList.put(service.getType().getName(), dispatcher);
        if (subscriptionConfig.getMapValue(Constants.PENDING_LIMITS) != null) {
            Register.setPendingLimits(dispatcher, subscriptionConfig.getMapValue(Constants.PENDING_LIMITS));
        }
        try {
            if (queueName != null) {
                dispatcher.subscribe(subject, queueName);
            } else {
                dispatcher.subscribe(subject);
            }
        }
        catch (IllegalArgumentException | IllegalStateException ex) {
            natsMetricsReporter.reportConsumerError(subject, "subscription");
            return Utils.createNatsError(errorMessage + ex.getMessage());
        }
        serviceList.add(service);
        String consoleOutput = "subject " + subject + (queueName != null ? " & queue " + queueName : "");
        console.println("[ballerina/nats] Client subscribed for " + consoleOutput);
        ArrayList subscriptionsList = (ArrayList)listenerObject.getNativeData("BasicSubscriptionList");
        subscriptionsList.add(subject);
        NatsMetricsReporter.reportSubscription(natsConnection.getConnectedUrl(), subject);
        return null;
    }

    private static void setPendingLimits(Dispatcher dispatcher, MapValue pendingLimits) {
        long maxMessages = pendingLimits.getIntValue(Constants.MAX_MESSAGES);
        long maxBytes = pendingLimits.getIntValue(Constants.MAX_BYTES);
        dispatcher.setPendingLimits(maxMessages, maxBytes);
    }
}

