/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RawReaderImpl
implements RawReader {
    static final int DEFAULT_RECEIVER_QUEUE_SIZE = 1000;
    private final ConsumerConfigurationData<byte[]> consumerConfiguration = new ConsumerConfigurationData();
    private RawConsumerImpl consumer;
    private static final Logger log = LoggerFactory.getLogger(RawReaderImpl.class);

    public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, CompletableFuture<Consumer<byte[]>> consumerFuture) {
        this.consumerConfiguration.getTopicNames().add(topic);
        this.consumerConfiguration.setSubscriptionName(subscription);
        this.consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        this.consumerConfiguration.setReceiverQueueSize(1000);
        this.consumerConfiguration.setReadCompacted(true);
        this.consumer = new RawConsumerImpl(client, this.consumerConfiguration, consumerFuture);
    }

    @Override
    public String getTopic() {
        return this.consumerConfiguration.getTopicNames().stream().findFirst().orElse(null);
    }

    @Override
    public CompletableFuture<Void> seekAsync(MessageId messageId) {
        return this.consumer.seekAsync(messageId);
    }

    @Override
    public CompletableFuture<RawMessage> readNextAsync() {
        return this.consumer.receiveRawAsync();
    }

    @Override
    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) {
        return this.consumer.doAcknowledge(messageId, PulsarApi.CommandAck.AckType.Cumulative, properties);
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return this.consumer.closeAsync();
    }

    @Override
    public CompletableFuture<MessageId> getLastMessageIdAsync() {
        return this.consumer.getLastMessageIdAsync();
    }

    public String toString() {
        return "RawReader(topic=" + this.getTopic() + ")";
    }

    static class RawConsumerImpl
    extends ConsumerImpl<byte[]> {
        final BlockingQueue<RawMessageAndCnx> incomingRawMessages = new GrowableArrayBlockingQueue();
        final Queue<CompletableFuture<RawMessage>> pendingRawReceives = new ConcurrentLinkedQueue<CompletableFuture<RawMessage>>();

        RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<byte[]> conf, CompletableFuture<Consumer<byte[]>> consumerFuture) {
            super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), TopicName.getPartitionIndex((String)conf.getSingleTopic()), consumerFuture, ConsumerImpl.SubscriptionMode.Durable, MessageId.earliest, Schema.BYTES, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void tryCompletePending() {
            CompletableFuture<RawMessage> future = null;
            RawMessageAndCnx messageAndCnx = null;
            RawConsumerImpl rawConsumerImpl = this;
            synchronized (rawConsumerImpl) {
                if (!this.pendingRawReceives.isEmpty() && !this.incomingRawMessages.isEmpty()) {
                    future = this.pendingRawReceives.remove();
                    messageAndCnx = (RawMessageAndCnx)this.incomingRawMessages.remove();
                }
            }
            if (future == null) {
                assert (messageAndCnx == null);
            } else {
                ClientCnx currentCnx;
                if (!future.complete(messageAndCnx.msg)) {
                    messageAndCnx.msg.close();
                    this.closeAsync();
                }
                if ((currentCnx = this.cnx()) == messageAndCnx.cnx) {
                    this.increaseAvailablePermits(currentCnx);
                }
            }
        }

        CompletableFuture<RawMessage> receiveRawAsync() {
            CompletableFuture<RawMessage> result = new CompletableFuture<RawMessage>();
            this.pendingRawReceives.add(result);
            this.tryCompletePending();
            return result;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void reset() {
            ArrayList<CompletableFuture<RawMessage>> toError = new ArrayList<CompletableFuture<RawMessage>>();
            RawConsumerImpl rawConsumerImpl = this;
            synchronized (rawConsumerImpl) {
                while (!this.pendingRawReceives.isEmpty()) {
                    toError.add(this.pendingRawReceives.remove());
                }
                RawMessageAndCnx m = (RawMessageAndCnx)this.incomingRawMessages.poll();
                while (m != null) {
                    m.msg.close();
                    m = (RawMessageAndCnx)this.incomingRawMessages.poll();
                }
                this.incomingRawMessages.clear();
            }
            toError.forEach(f -> {
                boolean bl = f.cancel(false);
            });
        }

        public CompletableFuture<Void> seekAsync(MessageId messageId) {
            this.reset();
            return super.seekAsync(messageId);
        }

        public CompletableFuture<Void> closeAsync() {
            this.reset();
            return super.closeAsync();
        }

        void messageReceived(PulsarApi.MessageIdData messageId, int redeliveryCount, ByteBuf headersAndPayload, ClientCnx cnx) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Received raw message: {}/{}/{}", new Object[]{this.topic, this.subscription, messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition()});
            }
            this.incomingRawMessages.add(new RawMessageAndCnx(new RawMessageImpl(messageId, headersAndPayload), cnx));
            this.tryCompletePending();
        }
    }

    private static class RawMessageAndCnx {
        final RawMessage msg;
        final ClientCnx cnx;

        RawMessageAndCnx(RawMessage msg, ClientCnx cnx) {
            this.msg = msg;
            this.cnx = cnx;
        }
    }
}

