/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.AbstractWebSocketHandler;
import org.apache.pulsar.websocket.WebSocketError;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReaderHandler
extends AbstractWebSocketHandler {
    private String subscription = "";
    private final ReaderConfiguration conf;
    private Reader reader;
    private final int maxPendingMessages;
    private final AtomicInteger pendingMessages = new AtomicInteger();
    private final LongAdder numMsgsDelivered;
    private final LongAdder numBytesDelivered;
    private volatile long msgDeliveredCounter = 0L;
    private static final AtomicLongFieldUpdater<ReaderHandler> MSG_DELIVERED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ReaderHandler.class, "msgDeliveredCounter");
    private static final Logger log = LoggerFactory.getLogger(ReaderHandler.class);

    public ReaderHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
        super(service, request, response);
        this.conf = this.getReaderConfiguration();
        this.maxPendingMessages = this.conf.getReceiverQueueSize() == 0 ? 1 : this.conf.getReceiverQueueSize();
        this.numMsgsDelivered = new LongAdder();
        this.numBytesDelivered = new LongAdder();
        try {
            this.reader = service.getPulsarClient().createReader(this.topic, this.getMessageId(), this.conf);
            this.subscription = ((ReaderImpl)this.reader).getConsumer().getSubscription();
            if (!this.service.addReader(this)) {
                log.warn("[{}:{}] Failed to add reader handler for topic {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), this.topic});
            }
        }
        catch (Exception e) {
            log.warn("[{}:{}] Failed in creating reader {} on topic {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), this.subscription, this.topic, e});
            try {
                response.sendError(500, "Failed to create reader");
            }
            catch (IOException e1) {
                log.warn("[{}:{}] Failed to send error: {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), e1.getMessage(), e1});
            }
        }
    }

    private void receiveMessage() {
        if (log.isDebugEnabled()) {
            log.debug("[{}:{}] [{}] [{}] Receive next message", new Object[]{this.request.getRemoteAddr(), this.request.getRemotePort(), this.topic, this.subscription});
        }
        ((CompletableFuture)this.reader.readNextAsync().thenAccept(msg -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Got message {}", new Object[]{this.getSession().getRemoteAddress(), this.topic, this.subscription, msg.getMessageId()});
            }
            ConsumerMessage dm = new ConsumerMessage();
            dm.messageId = Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray());
            dm.payload = Base64.getEncoder().encodeToString(msg.getData());
            dm.properties = msg.getProperties();
            dm.publishTime = DateFormatter.format((long)msg.getPublishTime());
            if (msg.getEventTime() != 0L) {
                dm.eventTime = DateFormatter.format((long)msg.getEventTime());
            }
            if (msg.hasKey()) {
                dm.key = msg.getKey();
            }
            final long msgSize = msg.getData().length;
            try {
                this.getSession().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)dm), new WriteCallback(){

                    public void writeFailed(Throwable th) {
                        log.warn("[{}/{}] Failed to deliver msg to {} {}", new Object[]{ReaderHandler.this.reader.getTopic(), ReaderHandler.this.subscription, ReaderHandler.this.getRemote().getInetSocketAddress().toString(), th.getMessage()});
                        ReaderHandler.this.pendingMessages.decrementAndGet();
                        ReaderHandler.this.service.getExecutor().execute(() -> ReaderHandler.this.receiveMessage());
                    }

                    public void writeSuccess() {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}/{}] message is delivered successfully to {} ", new Object[]{ReaderHandler.this.reader.getTopic(), ReaderHandler.this.subscription, ReaderHandler.this.getRemote().getInetSocketAddress().toString()});
                        }
                        ReaderHandler.this.updateDeliverMsgStat(msgSize);
                    }
                });
            }
            catch (JsonProcessingException e) {
                this.close(WebSocketError.FailedToSerializeToJSON);
            }
            int pending = this.pendingMessages.incrementAndGet();
            if (pending < this.maxPendingMessages) {
                this.service.getExecutor().execute(() -> this.receiveMessage());
            }
        })).exceptionally(exception -> {
            log.warn("[{}/{}] Failed to deliver msg to {} {}", new Object[]{this.reader.getTopic(), this.subscription, this.getRemote().getInetSocketAddress().toString(), exception});
            return null;
        });
    }

    @Override
    public void onWebSocketConnect(Session session) {
        super.onWebSocketConnect(session);
        this.receiveMessage();
    }

    public void onWebSocketText(String message) {
        super.onWebSocketText(message);
        int pending = this.pendingMessages.getAndDecrement();
        if (pending >= this.maxPendingMessages) {
            this.receiveMessage();
        }
    }

    @Override
    public void close() throws IOException {
        if (this.reader != null) {
            if (!this.service.removeReader(this)) {
                log.warn("[{}] Failed to remove reader handler", (Object)this.reader.getTopic());
            }
            ((CompletableFuture)this.reader.closeAsync().thenAccept(x -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed reader asynchronously", (Object)this.reader.getTopic());
                }
            })).exceptionally(exception -> {
                log.warn("[{}] Failed to close reader", (Object)this.reader.getTopic(), exception);
                return null;
            });
        }
    }

    public Consumer getConsumer() {
        return ((ReaderImpl)this.reader).getConsumer();
    }

    public String getSubscription() {
        return this.subscription;
    }

    public SubscriptionType getSubscriptionType() {
        return SubscriptionType.Exclusive;
    }

    public long getAndResetNumMsgsDelivered() {
        return this.numMsgsDelivered.sumThenReset();
    }

    public long getAndResetNumBytesDelivered() {
        return this.numBytesDelivered.sumThenReset();
    }

    public long getMsgDeliveredCounter() {
        return MSG_DELIVERED_COUNTER_UPDATER.get(this);
    }

    protected void updateDeliverMsgStat(long msgSize) {
        this.numMsgsDelivered.increment();
        MSG_DELIVERED_COUNTER_UPDATER.incrementAndGet(this);
        this.numBytesDelivered.add(msgSize);
    }

    private ReaderConfiguration getReaderConfiguration() {
        ReaderConfiguration conf = new ReaderConfiguration();
        if (this.queryParams.containsKey("readerName")) {
            conf.setReaderName((String)this.queryParams.get("readerName"));
        }
        if (this.queryParams.containsKey("receiverQueueSize")) {
            conf.setReceiverQueueSize(Math.min(Integer.parseInt((String)this.queryParams.get("receiverQueueSize")), 1000));
        }
        return conf;
    }

    @Override
    protected Boolean isAuthorized(String authRole) throws Exception {
        return this.service.getAuthorizationManager().canConsume(DestinationName.get((String)this.topic), authRole);
    }

    private MessageId getMessageId() throws IOException {
        MessageId messageId = MessageId.latest;
        if (StringUtils.isNotBlank((CharSequence)((CharSequence)this.queryParams.get("messageId")))) {
            if (((String)this.queryParams.get("messageId")).equals("earliest")) {
                messageId = MessageId.earliest;
            } else if (!((String)this.queryParams.get("messageId")).equals("latest")) {
                messageId = MessageIdImpl.fromByteArray((byte[])Base64.getDecoder().decode((String)this.queryParams.get("messageId")));
            }
        }
        return messageId;
    }
}

