/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.nats.streaming.consumer;

import io.nats.streaming.MessageHandler;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.ballerinalang.jvm.StringUtils;
import org.ballerinalang.jvm.TypeChecker;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.values.ArrayValue;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BString;
import org.ballerinalang.nats.Utils;
import org.ballerinalang.nats.connection.NatsStreamingConnection;
import org.ballerinalang.nats.observability.NatsMetricsReporter;
import org.ballerinalang.nats.streaming.consumer.StreamingListener;

public class Subscribe {
    private static final PrintStream console;
    private static final String STREAMING_SUBSCRIPTION_CONFIG = "StreamingSubscriptionConfig";
    private static final BString SUBJECT_ANNOTATION_FIELD;
    private static final BString QUEUE_NAME_ANNOTATION_FIELD;
    private static final BString DURABLE_NAME_ANNOTATION_FIELD;
    private static final BString MAX_IN_FLIGHT_ANNOTATION_FIELD;
    private static final BString ACK_WAIT_ANNOTATION_FIELD;
    private static final BString SUBSCRIPTION_TIMEOUT_ANNOTATION_FIELD;
    private static final BString MANUAL_ACK_ANNOTATION_FIELD;
    private static final BString START_POSITION_ANNOTATION_FIELD;

    public static void streamingSubscribe(ObjectValue streamingListener, ObjectValue connectionObject, BString clusterId, Object clientIdNillable, Object streamingConfig) {
        NatsStreamingConnection.createConnection(streamingListener, connectionObject, clusterId.getValue(), clientIdNillable, streamingConfig);
        NatsMetricsReporter natsMetricsReporter = (NatsMetricsReporter)connectionObject.getNativeData("nats_metric_util");
        StreamingConnection streamingConnection = (StreamingConnection)streamingListener.getNativeData("nats_streaming_connection");
        ConcurrentHashMap serviceListenerMap = (ConcurrentHashMap)streamingListener.getNativeData("StreamingDispatcherList");
        ConcurrentHashMap subscriptionsMap = (ConcurrentHashMap)streamingListener.getNativeData("StreamingSubscriptionsList");
        Iterator serviceListeners = serviceListenerMap.entrySet().iterator();
        while (serviceListeners.hasNext()) {
            Map.Entry pair = serviceListeners.next();
            Subscription sub = Subscribe.createSubscription((ObjectValue)pair.getKey(), (StreamingListener)pair.getValue(), streamingConnection, natsMetricsReporter);
            subscriptionsMap.put((ObjectValue)pair.getKey(), sub);
            serviceListeners.remove();
        }
    }

    private static Subscription createSubscription(ObjectValue service, StreamingListener messageHandler, StreamingConnection streamingConnection, NatsMetricsReporter natsMetricsReporter) {
        MapValue annotation = (MapValue)service.getType().getAnnotation("ballerina/nats:1.0.0", STREAMING_SUBSCRIPTION_CONFIG);
        Subscribe.assertNull(annotation, "Streaming configuration annotation not present.");
        String subject = annotation.getStringValue(SUBJECT_ANNOTATION_FIELD).getValue();
        Subscribe.assertNull(subject, "`Subject` annotation field is mandatory");
        String queueName = null;
        if (annotation.containsKey((Object)QUEUE_NAME_ANNOTATION_FIELD)) {
            queueName = annotation.getStringValue(QUEUE_NAME_ANNOTATION_FIELD).getValue();
        }
        SubscriptionOptions subscriptionOptions = Subscribe.buildSubscriptionOptions((MapValue<BString, Object>)annotation);
        String consoleOutput = "subject " + subject + (queueName != null ? " & queue " + queueName : "");
        try {
            Subscription subscription = streamingConnection.subscribe(subject, queueName, (MessageHandler)messageHandler, subscriptionOptions);
            console.println("[ballerina/nats] Client subscribed for " + consoleOutput);
            NatsMetricsReporter.reportSubscription(streamingConnection.getNatsConnection().getConnectedUrl(), subject);
            return subscription;
        }
        catch (IOException | InterruptedException e) {
            natsMetricsReporter.reportConsumerError(subject, "subscription");
            throw Utils.createNatsError(e.getMessage());
        }
        catch (TimeoutException e) {
            natsMetricsReporter.reportConsumerError(subject, "subscription");
            throw Utils.createNatsError("Error while creating the subscription");
        }
    }

    private static SubscriptionOptions buildSubscriptionOptions(MapValue<BString, Object> annotation) {
        SubscriptionOptions.Builder builder = new SubscriptionOptions.Builder();
        String durableName = null;
        if (annotation.containsKey((Object)DURABLE_NAME_ANNOTATION_FIELD)) {
            durableName = annotation.getStringValue(DURABLE_NAME_ANNOTATION_FIELD).getValue();
        }
        int maxInFlight = annotation.getIntValue(MAX_IN_FLIGHT_ANNOTATION_FIELD).intValue();
        int ackWait = annotation.getIntValue(ACK_WAIT_ANNOTATION_FIELD).intValue();
        int subscriptionTimeout = annotation.getIntValue(SUBSCRIPTION_TIMEOUT_ANNOTATION_FIELD).intValue();
        boolean manualAck = annotation.getBooleanValue(MANUAL_ACK_ANNOTATION_FIELD);
        Object startPosition = annotation.get((Object)START_POSITION_ANNOTATION_FIELD);
        Subscribe.setStartPositionInBuilder(builder, startPosition);
        builder.durableName(durableName).maxInFlight(maxInFlight).ackWait(Duration.ofSeconds(ackWait)).subscriptionTimeout(Duration.ofSeconds(subscriptionTimeout));
        if (manualAck) {
            builder.manualAcks();
        }
        return builder.build();
    }

    private static void setStartPositionInBuilder(SubscriptionOptions.Builder builder, Object startPosition) {
        BType type = TypeChecker.getType((Object)startPosition);
        int startPositionType = type.getTag();
        switch (startPositionType) {
            case 5: {
                BallerinaStartPosition startPositionValue = BallerinaStartPosition.valueOf(startPosition.toString());
                if (startPositionValue.equals((Object)BallerinaStartPosition.LAST_RECEIVED)) {
                    builder.startWithLastReceived();
                    break;
                }
                if (!startPositionValue.equals((Object)BallerinaStartPosition.FIRST)) break;
                builder.deliverAllAvailable();
                break;
            }
            case 32: {
                ArrayValue tupleValue = (ArrayValue)startPosition;
                String startPositionKind = tupleValue.getRefValue(0L).toString();
                long timeOrSequenceNo = (Long)tupleValue.getRefValue(1L);
                if (startPositionKind.equals(BallerinaStartPosition.TIME_DELTA_START.name())) {
                    builder.startAtTimeDelta(Duration.ofSeconds(timeOrSequenceNo));
                    break;
                }
                builder.startAtSequence(timeOrSequenceNo);
                break;
            }
            default: {
                throw new AssertionError((Object)("Invalid type for start position value " + startPositionType));
            }
        }
    }

    private static void assertNull(Object nullableObject, String errorMessage) {
        if (nullableObject == null) {
            throw Utils.createNatsError(errorMessage);
        }
    }

    static {
        SUBJECT_ANNOTATION_FIELD = StringUtils.fromString((String)"subject");
        QUEUE_NAME_ANNOTATION_FIELD = StringUtils.fromString((String)"queueName");
        DURABLE_NAME_ANNOTATION_FIELD = StringUtils.fromString((String)"durableName");
        MAX_IN_FLIGHT_ANNOTATION_FIELD = StringUtils.fromString((String)"maxInFlight");
        ACK_WAIT_ANNOTATION_FIELD = StringUtils.fromString((String)"ackWaitInSeconds");
        SUBSCRIPTION_TIMEOUT_ANNOTATION_FIELD = StringUtils.fromString((String)"subscriptionTimeoutInSeconds");
        MANUAL_ACK_ANNOTATION_FIELD = StringUtils.fromString((String)"manualAck");
        START_POSITION_ANNOTATION_FIELD = StringUtils.fromString((String)"startPosition");
        console = System.out;
    }

    private static enum BallerinaStartPosition {
        NEW_ONLY,
        LAST_RECEIVED,
        FIRST,
        TIME_DELTA_START,
        SEQUENCE_NUMBER;

    }
}

