/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.nats.streaming.consumer;

import io.nats.streaming.MessageHandler;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.ballerinalang.jvm.TypeChecker;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.values.ArrayValue;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.nats.Utils;
import org.ballerinalang.nats.connection.NatsStreamingConnection;
import org.ballerinalang.nats.observability.NatsMetricsUtil;
import org.ballerinalang.nats.streaming.consumer.StreamingListener;

public class Subscribe {
    private static final PrintStream console = System.out;
    private static final String STREAMING_SUBSCRIPTION_CONFIG = "StreamingSubscriptionConfig";
    private static final String SUBJECT_ANNOTATION_FIELD = "subject";
    private static final String QUEUE_NAME_ANNOTATION_FIELD = "queueName";
    private static final String DURABLE_NAME_ANNOTATION_FIELD = "durableName";
    private static final String MAX_IN_FLIGHT_ANNOTATION_FIELD = "maxInFlight";
    private static final String ACK_WAIT_ANNOTATION_FIELD = "ackWaitInSeconds";
    private static final String SUBSCRIPTION_TIMEOUT_ANNOTATION_FIELD = "subscriptionTimeoutInSeconds";
    private static final String MANUAL_ACK_ANNOTATION_FIELD = "manualAck";
    private static final String START_POSITION_ANNOTATION_FIELD = "startPosition";

    public static void streamingSubscribe(ObjectValue streamingListener, ObjectValue connectionObject, String clusterId, Object clientIdNillable, Object streamingConfig) {
        NatsStreamingConnection.createConnection(streamingListener, connectionObject, clusterId, clientIdNillable, streamingConfig);
        NatsMetricsUtil natsMetricsUtil = (NatsMetricsUtil)connectionObject.getNativeData("nats_metric_util");
        StreamingConnection streamingConnection = (StreamingConnection)streamingListener.getNativeData("nats_streaming_connection");
        ConcurrentHashMap serviceListenerMap = (ConcurrentHashMap)streamingListener.getNativeData("StreamingDispatcherList");
        ConcurrentHashMap subscriptionsMap = (ConcurrentHashMap)streamingListener.getNativeData("StreamingSubscriptionsList");
        Iterator serviceListeners = serviceListenerMap.entrySet().iterator();
        while (serviceListeners.hasNext()) {
            Map.Entry pair = serviceListeners.next();
            Subscription sub = Subscribe.createSubscription((ObjectValue)pair.getKey(), (StreamingListener)pair.getValue(), streamingConnection, natsMetricsUtil);
            subscriptionsMap.put((ObjectValue)pair.getKey(), sub);
            serviceListeners.remove();
        }
    }

    private static Subscription createSubscription(ObjectValue service, StreamingListener messageHandler, StreamingConnection streamingConnection, NatsMetricsUtil natsMetricsUtil) {
        MapValue annotation = (MapValue)service.getType().getAnnotation("ballerina/nats", STREAMING_SUBSCRIPTION_CONFIG);
        Subscribe.assertNull(annotation, "Streaming configuration annotation not present.");
        String subject = annotation.getStringValue(SUBJECT_ANNOTATION_FIELD);
        Subscribe.assertNull(subject, "`Subject` annotation field is mandatory");
        String queueName = annotation.getStringValue(QUEUE_NAME_ANNOTATION_FIELD);
        SubscriptionOptions subscriptionOptions = Subscribe.buildSubscriptionOptions((MapValue<String, Object>)annotation);
        String consoleOutput = "subject " + subject + (queueName != null ? " & queue " + queueName : "");
        try {
            Subscription subscription = streamingConnection.subscribe(subject, queueName, (MessageHandler)messageHandler, subscriptionOptions);
            console.println("[ballerina/nats] Client subscribed for " + consoleOutput);
            NatsMetricsUtil.reportSubscription(streamingConnection.getNatsConnection().getConnectedUrl(), subject);
            return subscription;
        }
        catch (IOException | InterruptedException e) {
            natsMetricsUtil.reportConsumerError(subject, "subscription");
            throw Utils.createNatsError(e.getMessage());
        }
        catch (TimeoutException e) {
            natsMetricsUtil.reportConsumerError(subject, "subscription");
            throw Utils.createNatsError("Error while creating the subscription");
        }
    }

    private static SubscriptionOptions buildSubscriptionOptions(MapValue<String, Object> annotation) {
        SubscriptionOptions.Builder builder = new SubscriptionOptions.Builder();
        String durableName = annotation.getStringValue(DURABLE_NAME_ANNOTATION_FIELD);
        int maxInFlight = annotation.getIntValue(MAX_IN_FLIGHT_ANNOTATION_FIELD).intValue();
        int ackWait = annotation.getIntValue(ACK_WAIT_ANNOTATION_FIELD).intValue();
        int subscriptionTimeout = annotation.getIntValue(SUBSCRIPTION_TIMEOUT_ANNOTATION_FIELD).intValue();
        boolean manualAck = annotation.getBooleanValue(MANUAL_ACK_ANNOTATION_FIELD);
        Object startPosition = annotation.get((Object)START_POSITION_ANNOTATION_FIELD);
        Subscribe.setStartPositionInBuilder(builder, startPosition);
        builder.durableName(durableName).maxInFlight(maxInFlight).ackWait(Duration.ofSeconds(ackWait)).subscriptionTimeout(Duration.ofSeconds(subscriptionTimeout));
        if (manualAck) {
            builder.manualAcks();
        }
        return builder.build();
    }

    private static void setStartPositionInBuilder(SubscriptionOptions.Builder builder, Object startPosition) {
        BType type = TypeChecker.getType((Object)startPosition);
        int startPositionType = type.getTag();
        switch (startPositionType) {
            case 5: {
                BallerinaStartPosition startPositionValue = BallerinaStartPosition.valueOf((String)startPosition);
                if (startPositionValue.equals((Object)BallerinaStartPosition.LAST_RECEIVED)) {
                    builder.startWithLastReceived();
                    break;
                }
                if (!startPositionValue.equals((Object)BallerinaStartPosition.FIRST)) break;
                builder.deliverAllAvailable();
                break;
            }
            case 31: {
                ArrayValue tupleValue = (ArrayValue)startPosition;
                String startPositionKind = (String)tupleValue.getRefValue(0L);
                long timeOrSequenceNo = (Long)tupleValue.getRefValue(1L);
                if (startPositionKind.equals(BallerinaStartPosition.TIME_DELTA_START.name())) {
                    builder.startAtTimeDelta(Duration.ofSeconds(timeOrSequenceNo));
                    break;
                }
                builder.startAtSequence(timeOrSequenceNo);
                break;
            }
            default: {
                throw new AssertionError((Object)("Invalid type for start position value " + startPositionType));
            }
        }
    }

    private static void assertNull(Object nullableObject, String errorMessage) {
        if (nullableObject == null) {
            throw Utils.createNatsError(errorMessage);
        }
    }

    private static enum BallerinaStartPosition {
        NEW_ONLY,
        LAST_RECEIVED,
        FIRST,
        TIME_DELTA_START,
        SEQUENCE_NUMBER;

    }
}

