/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandMessage;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RawReaderImpl
implements RawReader {
    static final int DEFAULT_RECEIVER_QUEUE_SIZE = 1000;
    private final ConsumerConfigurationData<byte[]> consumerConfiguration = new ConsumerConfigurationData();
    private RawConsumerImpl consumer;
    private static final Logger log = LoggerFactory.getLogger(RawReaderImpl.class);

    public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, CompletableFuture<Consumer<byte[]>> consumerFuture) {
        this.consumerConfiguration.getTopicNames().add(topic);
        this.consumerConfiguration.setSubscriptionName(subscription);
        this.consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        this.consumerConfiguration.setReceiverQueueSize(1000);
        this.consumerConfiguration.setReadCompacted(true);
        this.consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
        this.consumer = new RawConsumerImpl(client, this.consumerConfiguration, consumerFuture);
    }

    @Override
    public String getTopic() {
        return this.consumerConfiguration.getTopicNames().stream().findFirst().orElse(null);
    }

    @Override
    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
        return this.consumer.hasMessageAvailableAsync();
    }

    @Override
    public CompletableFuture<Void> seekAsync(MessageId messageId) {
        return this.consumer.seekAsync(messageId);
    }

    @Override
    public CompletableFuture<RawMessage> readNextAsync() {
        return this.consumer.receiveRawAsync();
    }

    @Override
    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) {
        return this.consumer.doAcknowledgeWithTxn(messageId, CommandAck.AckType.Cumulative, properties, null);
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return this.consumer.closeAsync();
    }

    @Override
    public CompletableFuture<MessageId> getLastMessageIdAsync() {
        return this.consumer.getLastMessageIdAsync();
    }

    public String toString() {
        return "RawReader(topic=" + this.getTopic() + ")";
    }

    static class RawConsumerImpl
    extends ConsumerImpl<byte[]> {
        final BlockingQueue<RawMessageAndCnx> incomingRawMessages = new GrowableArrayBlockingQueue();
        final Queue<CompletableFuture<RawMessage>> pendingRawReceives = new ConcurrentLinkedQueue<CompletableFuture<RawMessage>>();

        RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<byte[]> conf, CompletableFuture<Consumer<byte[]>> consumerFuture) {
            super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider(), TopicName.getPartitionIndex((String)conf.getSingleTopic()), false, false, consumerFuture, MessageId.earliest, 0L, Schema.BYTES, null, true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void tryCompletePending() {
            CompletableFuture<RawMessage> future = null;
            RawMessageAndCnx messageAndCnx = null;
            RawConsumerImpl rawConsumerImpl = this;
            synchronized (rawConsumerImpl) {
                if (!this.pendingRawReceives.isEmpty() && !this.incomingRawMessages.isEmpty()) {
                    future = this.pendingRawReceives.remove();
                    messageAndCnx = (RawMessageAndCnx)this.incomingRawMessages.remove();
                }
            }
            if (future == null) {
                assert (messageAndCnx == null);
            } else {
                int numMsg;
                try {
                    MessageMetadata msgMetadata = Commands.parseMessageMetadata((ByteBuf)messageAndCnx.msg.getHeadersAndPayload());
                    numMsg = msgMetadata.getNumMessagesInBatch();
                }
                catch (Throwable t) {
                    numMsg = 1;
                }
                if (!future.complete(messageAndCnx.msg)) {
                    messageAndCnx.msg.close();
                    this.closeAsync();
                }
                MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
                this.lastDequeuedMessageId = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), messageId.getPartition(), numMsg - 1);
                ClientCnx currentCnx = this.cnx();
                if (currentCnx == messageAndCnx.cnx) {
                    this.increaseAvailablePermits(currentCnx, numMsg);
                }
            }
        }

        protected CompletableFuture<Void> failPendingReceive() {
            if (this.internalPinnedExecutor.isShutdown()) {
                this.failPendingRawReceives();
                return CompletableFuture.completedFuture(null);
            }
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            this.internalPinnedExecutor.execute(() -> {
                try {
                    this.failPendingRawReceives();
                }
                finally {
                    future.complete(null);
                }
            });
            return future;
        }

        private void failPendingRawReceives() {
            ArrayList<CompletableFuture<RawMessage>> toError = new ArrayList<CompletableFuture<RawMessage>>();
            while (!this.pendingRawReceives.isEmpty()) {
                toError.add(this.pendingRawReceives.remove());
            }
            toError.forEach(f -> f.cancel(false));
        }

        CompletableFuture<RawMessage> receiveRawAsync() {
            CompletableFuture<RawMessage> result = new CompletableFuture<RawMessage>();
            this.pendingRawReceives.add(result);
            this.tryCompletePending();
            return result;
        }

        private void reset() {
            this.failPendingRawReceives();
            this.clearIncomingRawMessages();
        }

        private void clearIncomingRawMessages() {
            RawMessageAndCnx m = (RawMessageAndCnx)this.incomingRawMessages.poll();
            while (m != null) {
                m.msg.close();
                m = (RawMessageAndCnx)this.incomingRawMessages.poll();
            }
        }

        protected void clearIncomingMessages() {
            super.clearIncomingMessages();
            this.clearIncomingRawMessages();
        }

        public CompletableFuture<Void> seekAsync(long timestamp) {
            this.reset();
            return super.seekAsync(timestamp);
        }

        public CompletableFuture<Void> seekAsync(MessageId messageId) {
            this.reset();
            return super.seekAsync(messageId);
        }

        public CompletableFuture<Void> closeAsync() {
            CompletableFuture closeFuture = super.closeAsync();
            this.reset();
            return closeFuture;
        }

        void messageReceived(CommandMessage commandMessage, ByteBuf headersAndPayload, ClientCnx cnx) {
            HandlerState.State state = this.getState();
            if (state == HandlerState.State.Closing || state == HandlerState.State.Closed) {
                return;
            }
            MessageIdData messageId = commandMessage.getMessageId();
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Received raw message: {}/{}/{}", new Object[]{this.topic, this.subscription, messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition()});
            }
            this.incomingRawMessages.add(new RawMessageAndCnx(new RawMessageImpl(messageId, headersAndPayload), cnx));
            this.internalPinnedExecutor.execute(this::tryCompletePending);
        }
    }

    private static class RawMessageAndCnx {
        final RawMessage msg;
        final ClientCnx cnx;

        RawMessageAndCnx(RawMessage msg, ClientCnx cnx) {
            this.msg = msg;
            this.cnx = cnx;
        }
    }
}

