package io.nats.client.impl;

import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamOptions;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.PublishOptions;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.AccountStatistics;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerCreateRequest;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.MessageInfo;
import io.nats.client.api.PublishAck;
import io.nats.client.api.PurgeResponse;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.SuccessApiResponse;
import io.nats.client.support.ApiConstants;
import io.nats.client.support.JsonUtils;
import io.nats.client.support.NatsJetStreamConstants;
import io.nats.client.support.Validator;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/* loaded from: input_file:io/nats/client/impl/NatsJetStream.class */
public class NatsJetStream implements JetStream, JetStreamManagement, NatsJetStreamConstants {
    private final NatsConnection conn;
    private final JetStreamOptions jso;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/nats/client/impl/NatsJetStream$AutoAckMessageHandler.class */
    public static class AutoAckMessageHandler implements MessageHandler {
        MessageHandler userMH;

        AutoAckMessageHandler(MessageHandler messageHandler) {
            this.userMH = messageHandler;
        }

        @Override // io.nats.client.MessageHandler
        public void onMessage(Message message) throws InterruptedException {
            try {
                this.userMH.onMessage(message);
                message.ack();
            } catch (Exception e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NatsJetStream(NatsConnection natsConnection, JetStreamOptions jetStreamOptions) throws IOException {
        this.conn = natsConnection;
        this.jso = JetStreamOptions.builder(jetStreamOptions).build();
        checkEnabled();
    }

    private void checkEnabled() throws IOException {
        try {
            Message makeRequest = makeRequest("INFO", null, this.jso.getRequestTimeout());
            if (makeRequest == null) {
                throw new IllegalStateException("JetStream is not enabled.");
            }
            AccountStatistics accountStatistics = new AccountStatistics(makeRequest);
            if (accountStatistics.getErrorCode() == 503) {
                throw new IllegalStateException(accountStatistics.getDescription());
            }
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    @Override // io.nats.client.JetStreamManagement
    public StreamInfo addStream(StreamConfiguration streamConfiguration) throws IOException, JetStreamApiException {
        return addOrUpdateStream(streamConfiguration, NatsJetStreamConstants.JSAPI_STREAM_CREATE);
    }

    @Override // io.nats.client.JetStreamManagement
    public StreamInfo updateStream(StreamConfiguration streamConfiguration) throws IOException, JetStreamApiException {
        return addOrUpdateStream(streamConfiguration, NatsJetStreamConstants.JSAPI_STREAM_UPDATE);
    }

    private StreamInfo addOrUpdateStream(StreamConfiguration streamConfiguration, String str) throws IOException, JetStreamApiException {
        if (streamConfiguration == null) {
            throw new IllegalArgumentException("Configuration cannot be null.");
        }
        String name = streamConfiguration.getName();
        if (Validator.nullOrEmpty(name)) {
            throw new IllegalArgumentException("Configuration must have a valid stream name");
        }
        return new StreamInfo(makeRequestResponseRequired(String.format(str, name), streamConfiguration.toJson().getBytes(), this.jso.getRequestTimeout())).throwOnHasError();
    }

    @Override // io.nats.client.JetStreamManagement
    public boolean deleteStream(String str) throws IOException, JetStreamApiException {
        return new SuccessApiResponse(makeRequestResponseRequired(String.format(NatsJetStreamConstants.JSAPI_STREAM_DELETE, str), null, this.jso.getRequestTimeout())).throwOnHasError().getSuccess();
    }

    @Override // io.nats.client.JetStreamManagement
    public StreamInfo getStreamInfo(String str) throws IOException, JetStreamApiException {
        return new StreamInfo(makeRequestResponseRequired(String.format(NatsJetStreamConstants.JSAPI_STREAM_INFO, str), null, this.jso.getRequestTimeout())).throwOnHasError();
    }

    @Override // io.nats.client.JetStreamManagement
    public PurgeResponse purgeStream(String str) throws IOException, JetStreamApiException {
        return new PurgeResponse(makeRequestResponseRequired(String.format(NatsJetStreamConstants.JSAPI_STREAM_PURGE, str), null, this.jso.getRequestTimeout())).throwOnHasError();
    }

    @Override // io.nats.client.JetStreamManagement
    public ConsumerInfo addOrUpdateConsumer(String str, ConsumerConfiguration consumerConfiguration) throws IOException, JetStreamApiException {
        Validator.validateStreamName(str, true);
        Validator.validateNotNull(consumerConfiguration, "Config");
        Validator.validateNotNull(consumerConfiguration.getDurable(), "Durable");
        return addOrUpdateConsumerInternal(str, consumerConfiguration);
    }

    private ConsumerInfo addOrUpdateConsumerInternal(String str, ConsumerConfiguration consumerConfiguration) throws IOException, JetStreamApiException {
        String durable = consumerConfiguration.getDurable();
        return new ConsumerInfo(makeRequestResponseRequired(durable == null ? String.format(NatsJetStreamConstants.JSAPI_CONSUMER_CREATE, str) : String.format(NatsJetStreamConstants.JSAPI_DURABLE_CREATE, str, durable), new ConsumerCreateRequest(str, consumerConfiguration).toJson().getBytes(), this.conn.getOptions().getConnectionTimeout())).throwOnHasError();
    }

    @Override // io.nats.client.JetStreamManagement
    public boolean deleteConsumer(String str, String str2) throws IOException, JetStreamApiException {
        return new SuccessApiResponse(makeRequestResponseRequired(String.format(NatsJetStreamConstants.JSAPI_CONSUMER_DELETE, str, str2), null, this.jso.getRequestTimeout())).throwOnHasError().getSuccess();
    }

    @Override // io.nats.client.JetStreamManagement
    public ConsumerInfo getConsumerInfo(String str, String str2) throws IOException, JetStreamApiException {
        return new ConsumerInfo(makeRequestResponseRequired(String.format(NatsJetStreamConstants.JSAPI_CONSUMER_INFO, str, str2), null, this.jso.getRequestTimeout())).throwOnHasError();
    }

    @Override // io.nats.client.JetStreamManagement
    public List<String> getConsumerNames(String str) throws IOException, JetStreamApiException {
        return getConsumerNames(str, null);
    }

    private List<String> getConsumerNames(String str, String str2) throws IOException, JetStreamApiException {
        String format = String.format(NatsJetStreamConstants.JSAPI_CONSUMER_NAMES, str);
        ConsumerNamesReader consumerNamesReader = new ConsumerNamesReader();
        while (consumerNamesReader.hasMore()) {
            consumerNamesReader.process(makeRequestResponseRequired(format, consumerNamesReader.nextJson(str2), this.jso.getRequestTimeout()));
        }
        return consumerNamesReader.getStrings();
    }

    @Override // io.nats.client.JetStreamManagement
    public List<ConsumerInfo> getConsumers(String str) throws IOException, JetStreamApiException {
        String format = String.format(NatsJetStreamConstants.JSAPI_CONSUMER_LIST, str);
        ConsumerListReader consumerListReader = new ConsumerListReader();
        while (consumerListReader.hasMore()) {
            consumerListReader.process(makeRequestResponseRequired(format, consumerListReader.nextJson(), this.jso.getRequestTimeout()));
        }
        return consumerListReader.getConsumers();
    }

    @Override // io.nats.client.JetStreamManagement
    public List<String> getStreamNames() throws IOException, JetStreamApiException {
        StreamNamesReader streamNamesReader = new StreamNamesReader();
        while (streamNamesReader.hasMore()) {
            streamNamesReader.process(makeRequestResponseRequired("STREAM.NAMES", streamNamesReader.nextJson(), this.jso.getRequestTimeout()));
        }
        return streamNamesReader.getStrings();
    }

    @Override // io.nats.client.JetStreamManagement
    public List<StreamInfo> getStreams() throws IOException, JetStreamApiException {
        StreamListReader streamListReader = new StreamListReader();
        while (streamListReader.hasMore()) {
            streamListReader.process(makeRequestResponseRequired(NatsJetStreamConstants.JSAPI_STREAM_LIST, streamListReader.nextJson(), this.jso.getRequestTimeout()));
        }
        return streamListReader.getStreams();
    }

    @Override // io.nats.client.JetStreamManagement
    public MessageInfo getMessage(String str, long j) throws IOException, JetStreamApiException {
        return new MessageInfo(makeRequestResponseRequired(String.format(NatsJetStreamConstants.JSAPI_MSG_GET, str), JsonUtils.simpleMessageBody(ApiConstants.SEQ, Long.valueOf(j)), this.jso.getRequestTimeout())).throwOnHasError();
    }

    @Override // io.nats.client.JetStreamManagement
    public boolean deleteMessage(String str, long j) throws IOException, JetStreamApiException {
        return new SuccessApiResponse(makeRequestResponseRequired(String.format(NatsJetStreamConstants.JSAPI_MSG_DELETE, str), JsonUtils.simpleMessageBody(ApiConstants.SEQ, Long.valueOf(j)), this.jso.getRequestTimeout())).throwOnHasError().getSuccess();
    }

    @Override // io.nats.client.JetStream
    public PublishAck publish(String str, byte[] bArr) throws IOException, JetStreamApiException {
        return publishSyncInternal(str, null, bArr, false, null);
    }

    @Override // io.nats.client.JetStream
    public PublishAck publish(String str, byte[] bArr, PublishOptions publishOptions) throws IOException, JetStreamApiException {
        return publishSyncInternal(str, null, bArr, publishOptions);
    }

    @Override // io.nats.client.JetStream
    public PublishAck publish(Message message) throws IOException, JetStreamApiException {
        Validator.validateNotNull(message, "Message");
        return publishSyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), null);
    }

    @Override // io.nats.client.JetStream
    public PublishAck publish(Message message, PublishOptions publishOptions) throws IOException, JetStreamApiException {
        Validator.validateNotNull(message, "Message");
        return publishSyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), publishOptions);
    }

    @Override // io.nats.client.JetStream
    public CompletableFuture<PublishAck> publishAsync(String str, byte[] bArr) {
        return publishAsyncInternal(str, null, bArr, null, null);
    }

    @Override // io.nats.client.JetStream
    public CompletableFuture<PublishAck> publishAsync(String str, byte[] bArr, PublishOptions publishOptions) {
        return publishAsyncInternal(str, null, bArr, publishOptions, null);
    }

    @Override // io.nats.client.JetStream
    public CompletableFuture<PublishAck> publishAsync(Message message) {
        Validator.validateNotNull(message, "Message");
        return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), null, null);
    }

    @Override // io.nats.client.JetStream
    public CompletableFuture<PublishAck> publishAsync(Message message, PublishOptions publishOptions) {
        Validator.validateNotNull(message, "Message");
        return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), publishOptions, null);
    }

    private PublishAck publishSyncInternal(String str, Headers headers, byte[] bArr, PublishOptions publishOptions) throws IOException, JetStreamApiException {
        return publishSyncInternal(str, headers, bArr, false, publishOptions);
    }

    @Deprecated
    private PublishAck publishSyncInternal(String str, Headers headers, byte[] bArr, boolean z, PublishOptions publishOptions) throws IOException, JetStreamApiException {
        Headers mergePublishOptions = mergePublishOptions(headers, publishOptions);
        if (!this.jso.isPublishNoAck()) {
            return processPublishResponse(makeRequestResponseRequired(str, mergePublishOptions, bArr, z, publishOptions == null ? this.jso.getRequestTimeout() : publishOptions.getStreamTimeout(), false), publishOptions);
        }
        this.conn.publishInternal(str, null, mergePublishOptions, bArr, z);
        return null;
    }

    private CompletableFuture<PublishAck> publishAsyncInternal(String str, Headers headers, byte[] bArr, PublishOptions publishOptions, Duration duration) {
        return publishAsyncInternal(str, headers, bArr, false, publishOptions, duration);
    }

    @Deprecated
    private CompletableFuture<PublishAck> publishAsyncInternal(String str, Headers headers, byte[] bArr, boolean z, PublishOptions publishOptions, Duration duration) {
        Headers mergePublishOptions = mergePublishOptions(headers, publishOptions);
        if (!this.jso.isPublishNoAck()) {
            return this.conn.requestFutureInternal(str, mergePublishOptions, bArr, z, duration, false).thenCompose(message -> {
                try {
                    responseRequired(message);
                    return CompletableFuture.completedFuture(processPublishResponse(message, publishOptions));
                } catch (JetStreamApiException | IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        this.conn.publishInternal(str, null, mergePublishOptions, bArr, z);
        return null;
    }

    private PublishAck processPublishResponse(Message message, PublishOptions publishOptions) throws IOException, JetStreamApiException {
        if (message.isStatusMessage()) {
            if (message.getStatus().getCode() == 503) {
                throw new IOException("Error Publishing: No stream available.");
            }
            throw new IOException("Error Publishing: " + message.getStatus().getMessage());
        }
        PublishAck publishAck = new PublishAck(message);
        String stream = publishAck.getStream();
        String stream2 = publishOptions == null ? null : publishOptions.getStream();
        if (!isStreamSpecified(stream2) || stream2.equals(stream)) {
            return publishAck;
        }
        throw new IOException("Expected ack from stream " + stream2 + ", received from: " + stream);
    }

    private Headers mergePublishOptions(Headers headers, PublishOptions publishOptions) {
        Headers headers2;
        if (publishOptions == null) {
            headers2 = headers == null ? null : new Headers(headers);
        } else {
            headers2 = new Headers(headers);
            long expectedLastSequence = publishOptions.getExpectedLastSequence();
            if (expectedLastSequence > 0) {
                headers2.add(NatsJetStreamConstants.EXPECTED_LAST_SEQ_HDR, Long.toString(expectedLastSequence));
            }
            String expectedLastMsgId = publishOptions.getExpectedLastMsgId();
            if (expectedLastMsgId != null) {
                headers2.add(NatsJetStreamConstants.EXPECTED_LAST_MSG_ID_HDR, expectedLastMsgId);
            }
            String expectedStream = publishOptions.getExpectedStream();
            if (expectedStream != null) {
                headers2.add(NatsJetStreamConstants.EXPECTED_STREAM_HDR, expectedStream);
            }
            String messageId = publishOptions.getMessageId();
            if (messageId != null) {
                headers2.add(NatsJetStreamConstants.MSG_ID_HDR, messageId);
            }
        }
        return headers2;
    }

    private boolean isStreamSpecified(String str) {
        return str != null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    NatsJetStreamSubscription createSubscription(String str, String str2, NatsDispatcher natsDispatcher, MessageHandler messageHandler, boolean z, PushSubscribeOptions pushSubscribeOptions, PullSubscribeOptions pullSubscribeOptions) throws IOException, JetStreamApiException {
        PushSubscribeOptions build;
        String stream;
        ConsumerConfiguration.Builder builder;
        NatsJetStreamSubscription natsJetStreamSubscription;
        ConsumerInfo lookupConsumerInfo;
        boolean z2 = pullSubscribeOptions != 0;
        if (z2) {
            build = pullSubscribeOptions;
            stream = pullSubscribeOptions.getStream();
            builder = ConsumerConfiguration.builder(pullSubscribeOptions.getConsumerConfiguration());
            builder.deliverSubject(null);
        } else {
            build = pushSubscribeOptions == null ? PushSubscribeOptions.builder().build() : pushSubscribeOptions;
            stream = build.getStream();
            builder = ConsumerConfiguration.builder(build.getConsumerConfiguration());
        }
        String durable = builder.getDurable();
        String deliverSubject = builder.getDeliverSubject();
        boolean z3 = true;
        if (stream == null) {
            stream = lookupStreamBySubject(str);
        }
        if (durable != null && (lookupConsumerInfo = lookupConsumerInfo(stream, durable)) != null) {
            z3 = false;
            ConsumerConfiguration consumerConfiguration = lookupConsumerInfo.getConsumerConfiguration();
            String filterSubject = consumerConfiguration.getFilterSubject();
            if (filterSubject != null && !filterSubject.equals(str)) {
                throw new IllegalArgumentException(String.format("Subject %s mismatches consumer configuration %s.", str, filterSubject));
            }
            deliverSubject = consumerConfiguration.getDeliverSubject();
        }
        if (deliverSubject == null) {
            deliverSubject = this.conn.createInbox();
        }
        if (natsDispatcher == null) {
            natsJetStreamSubscription = (NatsJetStreamSubscription) this.conn.createSubscription(deliverSubject, str2, null, true);
        } else {
            natsJetStreamSubscription = (NatsJetStreamSubscription) natsDispatcher.subscribeImpl(deliverSubject, str2, z ? new AutoAckMessageHandler(messageHandler) : messageHandler, true);
        }
        if (z3) {
            if (builder.getMaxAckPending() == 0 && builder.getAckPolicy() != AckPolicy.None) {
                builder.maxAckPending(natsJetStreamSubscription.getPendingMessageLimit());
            }
            if (!z2) {
                builder.deliverSubject(deliverSubject);
            }
            builder.filterSubject(str);
            try {
                ConsumerInfo addOrUpdateConsumerInternal = addOrUpdateConsumerInternal(stream, builder.build());
                natsJetStreamSubscription.setupJetStream(this, addOrUpdateConsumerInternal.getName(), addOrUpdateConsumerInternal.getStreamName(), deliverSubject, build);
            } catch (JetStreamApiException e) {
                if (natsDispatcher == null) {
                    natsJetStreamSubscription.unsubscribe();
                } else {
                    natsDispatcher.unsubscribe(natsJetStreamSubscription);
                }
                throw e;
            }
        } else {
            natsJetStreamSubscription.setupJetStream(this, durable, stream, deliverSubject, build);
        }
        return natsJetStreamSubscription;
    }

    @Override // io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        return createSubscription(str, null, null, null, false, null, null);
    }

    @Override // io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, PushSubscribeOptions pushSubscribeOptions) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        return createSubscription(str, null, null, null, false, pushSubscribeOptions, null);
    }

    @Override // io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, String str2, PushSubscribeOptions pushSubscribeOptions) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        return createSubscription(str, Validator.validateQueueName(str2, false), null, null, false, pushSubscribeOptions, null);
    }

    @Override // io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, Dispatcher dispatcher, MessageHandler messageHandler, boolean z) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(messageHandler, "Handler");
        return createSubscription(str, null, (NatsDispatcher) dispatcher, messageHandler, z, null, null);
    }

    @Override // io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, Dispatcher dispatcher, MessageHandler messageHandler, boolean z, PushSubscribeOptions pushSubscribeOptions) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(messageHandler, "Handler");
        return createSubscription(str, null, (NatsDispatcher) dispatcher, messageHandler, z, pushSubscribeOptions, null);
    }

    @Override // io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, String str2, Dispatcher dispatcher, MessageHandler messageHandler, boolean z, PushSubscribeOptions pushSubscribeOptions) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        String validateQueueName = Validator.validateQueueName(str2, false);
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(messageHandler, "Handler");
        return createSubscription(str, validateQueueName, (NatsDispatcher) dispatcher, messageHandler, z, pushSubscribeOptions, null);
    }

    @Override // io.nats.client.JetStream
    public JetStreamSubscription subscribe(String str, PullSubscribeOptions pullSubscribeOptions) throws IOException, JetStreamApiException {
        Validator.validateSubject(str, true);
        Validator.validateNotNull(pullSubscribeOptions, "Options");
        Validator.validateNotNull(pullSubscribeOptions.getDurable(), "Durable");
        return createSubscription(str, null, null, null, false, null, pullSubscribeOptions);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerInfo lookupConsumerInfo(String str, String str2) throws IOException, JetStreamApiException {
        try {
            return getConsumerInfo(str, str2);
        } catch (JetStreamApiException e) {
            if (e.getErrorCode() == 404 && e.getErrorDescription().contains("consumer")) {
                return null;
            }
            throw e;
        }
    }

    private String lookupStreamBySubject(String str) throws IOException, JetStreamApiException {
        String format = String.format("{\"subject\":\"%s\"}", str);
        StreamNamesReader streamNamesReader = new StreamNamesReader();
        streamNamesReader.process(makeRequestResponseRequired("STREAM.NAMES", format.getBytes(), this.jso.getRequestTimeout()));
        if (streamNamesReader.getStrings().size() != 1) {
            throw new IllegalStateException("No matching streams for subject: " + str);
        }
        return streamNamesReader.getStrings().get(0);
    }

    private Message makeRequest(String str, byte[] bArr, Duration duration) throws InterruptedException {
        return this.conn.request(prependPrefix(str), bArr, duration);
    }

    private Message makeRequestResponseRequired(String str, byte[] bArr, Duration duration) throws IOException {
        try {
            return responseRequired(this.conn.request(prependPrefix(str), bArr, duration));
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    private Message makeRequestResponseRequired(String str, Headers headers, byte[] bArr, boolean z, Duration duration, boolean z2) throws IOException {
        try {
            return responseRequired(this.conn.requestInternal(str, headers, bArr, z, duration, z2));
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    private Message responseRequired(Message message) throws IOException {
        if (message == null) {
            throw new IOException("Timeout or no response waiting for NATS JetStream server");
        }
        return message;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String prependPrefix(String str) {
        return this.jso.getPrefix() + str;
    }

    Duration getRequestTimeout() {
        return this.jso.getRequestTimeout();
    }
}
