package org.ballerinalang.nats.streaming.consumer;

import io.nats.streaming.StreamingConnection;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.ballerinalang.jvm.StringUtils;
import org.ballerinalang.jvm.TypeChecker;
import org.ballerinalang.jvm.values.ArrayValue;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.api.BString;
import org.ballerinalang.nats.Utils;
import org.ballerinalang.nats.connection.NatsStreamingConnection;
import org.ballerinalang.nats.observability.NatsMetricsReporter;
import org.ballerinalang.nats.observability.NatsObservabilityConstants;
import org.ballerinalang.stdlib.crypto.Constants;

/* loaded from: input_file:org/ballerinalang/nats/streaming/consumer/Subscribe.class */
public class Subscribe {
    private static final String STREAMING_SUBSCRIPTION_CONFIG = "StreamingSubscriptionConfig";
    private static final BString SUBJECT_ANNOTATION_FIELD = StringUtils.fromString(Constants.CERTIFICATE_RECORD_SUBJECT_FIELD);
    private static final BString QUEUE_NAME_ANNOTATION_FIELD = StringUtils.fromString("queueName");
    private static final BString DURABLE_NAME_ANNOTATION_FIELD = StringUtils.fromString("durableName");
    private static final BString MAX_IN_FLIGHT_ANNOTATION_FIELD = StringUtils.fromString("maxInFlight");
    private static final BString ACK_WAIT_ANNOTATION_FIELD = StringUtils.fromString("ackWaitInSeconds");
    private static final BString SUBSCRIPTION_TIMEOUT_ANNOTATION_FIELD = StringUtils.fromString("subscriptionTimeoutInSeconds");
    private static final BString MANUAL_ACK_ANNOTATION_FIELD = StringUtils.fromString("manualAck");
    private static final BString START_POSITION_ANNOTATION_FIELD = StringUtils.fromString("startPosition");
    private static final PrintStream console = System.out;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/ballerinalang/nats/streaming/consumer/Subscribe$BallerinaStartPosition.class */
    public enum BallerinaStartPosition {
        NEW_ONLY,
        LAST_RECEIVED,
        FIRST,
        TIME_DELTA_START,
        SEQUENCE_NUMBER
    }

    public static void streamingSubscribe(ObjectValue objectValue, ObjectValue objectValue2, BString bString, Object obj, Object obj2) {
        NatsStreamingConnection.createConnection(objectValue, objectValue2, bString.getValue(), obj, obj2);
        NatsMetricsReporter natsMetricsReporter = (NatsMetricsReporter) objectValue2.getNativeData(org.ballerinalang.nats.Constants.NATS_METRIC_UTIL);
        StreamingConnection streamingConnection = (StreamingConnection) objectValue.getNativeData(org.ballerinalang.nats.Constants.NATS_STREAMING_CONNECTION);
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) objectValue.getNativeData(org.ballerinalang.nats.Constants.STREAMING_DISPATCHER_LIST);
        ConcurrentHashMap concurrentHashMap2 = (ConcurrentHashMap) objectValue.getNativeData(org.ballerinalang.nats.Constants.STREAMING_SUBSCRIPTION_LIST);
        Iterator it = concurrentHashMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            concurrentHashMap2.put((ObjectValue) entry.getKey(), createSubscription((ObjectValue) entry.getKey(), (StreamingListener) entry.getValue(), streamingConnection, natsMetricsReporter));
            it.remove();
        }
    }

    private static Subscription createSubscription(ObjectValue objectValue, StreamingListener streamingListener, StreamingConnection streamingConnection, NatsMetricsReporter natsMetricsReporter) {
        MapValue mapValue = (MapValue) objectValue.getType().getAnnotation(org.ballerinalang.nats.Constants.NATS_PACKAGE, "StreamingSubscriptionConfig");
        assertNull(mapValue, "Streaming configuration annotation not present.");
        String value = mapValue.getStringValue(SUBJECT_ANNOTATION_FIELD).getValue();
        assertNull(value, "`Subject` annotation field is mandatory");
        String str = null;
        if (mapValue.containsKey(QUEUE_NAME_ANNOTATION_FIELD)) {
            str = mapValue.getStringValue(QUEUE_NAME_ANNOTATION_FIELD).getValue();
        }
        SubscriptionOptions buildSubscriptionOptions = buildSubscriptionOptions(mapValue);
        String str2 = "subject " + value + (str != null ? " & queue " + str : "");
        try {
            Subscription subscribe = streamingConnection.subscribe(value, str, streamingListener, buildSubscriptionOptions);
            console.println(org.ballerinalang.nats.Constants.NATS_CLIENT_SUBSCRIBED + str2);
            NatsMetricsReporter.reportSubscription(streamingConnection.getNatsConnection().getConnectedUrl(), value);
            return subscribe;
        } catch (IOException | InterruptedException e) {
            natsMetricsReporter.reportConsumerError(value, NatsObservabilityConstants.ERROR_TYPE_SUBSCRIPTION);
            throw Utils.createNatsError(e.getMessage());
        } catch (TimeoutException e2) {
            natsMetricsReporter.reportConsumerError(value, NatsObservabilityConstants.ERROR_TYPE_SUBSCRIPTION);
            throw Utils.createNatsError("Error while creating the subscription");
        }
    }

    private static SubscriptionOptions buildSubscriptionOptions(MapValue<BString, Object> mapValue) {
        SubscriptionOptions.Builder builder = new SubscriptionOptions.Builder();
        String str = null;
        if (mapValue.containsKey(DURABLE_NAME_ANNOTATION_FIELD)) {
            str = mapValue.getStringValue(DURABLE_NAME_ANNOTATION_FIELD).getValue();
        }
        int intValue = mapValue.getIntValue(MAX_IN_FLIGHT_ANNOTATION_FIELD).intValue();
        int intValue2 = mapValue.getIntValue(ACK_WAIT_ANNOTATION_FIELD).intValue();
        int intValue3 = mapValue.getIntValue(SUBSCRIPTION_TIMEOUT_ANNOTATION_FIELD).intValue();
        boolean booleanValue = mapValue.getBooleanValue(MANUAL_ACK_ANNOTATION_FIELD).booleanValue();
        setStartPositionInBuilder(builder, mapValue.get(START_POSITION_ANNOTATION_FIELD));
        builder.durableName(str).maxInFlight(intValue).ackWait(Duration.ofSeconds(intValue2)).subscriptionTimeout(Duration.ofSeconds(intValue3));
        if (booleanValue) {
            builder.manualAcks();
        }
        return builder.build();
    }

    private static void setStartPositionInBuilder(SubscriptionOptions.Builder builder, Object obj) {
        int tag = TypeChecker.getType(obj).getTag();
        switch (tag) {
            case 5:
                BallerinaStartPosition valueOf = BallerinaStartPosition.valueOf(obj.toString());
                if (valueOf.equals(BallerinaStartPosition.LAST_RECEIVED)) {
                    builder.startWithLastReceived();
                    return;
                } else {
                    if (valueOf.equals(BallerinaStartPosition.FIRST)) {
                        builder.deliverAllAvailable();
                        return;
                    }
                    return;
                }
            case 32:
                ArrayValue arrayValue = (ArrayValue) obj;
                String obj2 = arrayValue.getRefValue(0L).toString();
                long longValue = ((Long) arrayValue.getRefValue(1L)).longValue();
                if (obj2.equals(BallerinaStartPosition.TIME_DELTA_START.name())) {
                    builder.startAtTimeDelta(Duration.ofSeconds(longValue));
                    return;
                } else {
                    builder.startAtSequence(longValue);
                    return;
                }
            default:
                throw new AssertionError("Invalid type for start position value " + tag);
        }
    }

    private static void assertNull(Object obj, String str) {
        if (obj == null) {
            throw Utils.createNatsError(str);
        }
    }
}
