/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.AbstractWebSocketHandler;
import org.apache.pulsar.websocket.WebSocketError;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.data.ConsumerAck;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerHandler
extends AbstractWebSocketHandler {
    private final String subscription;
    private final ConsumerConfiguration conf;
    private Consumer consumer;
    private final int maxPendingMessages;
    private final AtomicInteger pendingMessages = new AtomicInteger();
    private final LongAdder numMsgsDelivered;
    private final LongAdder numBytesDelivered;
    private final LongAdder numMsgsAcked;
    private volatile long msgDeliveredCounter = 0L;
    private static final AtomicLongFieldUpdater<ConsumerHandler> MSG_DELIVERED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ConsumerHandler.class, "msgDeliveredCounter");
    private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withZone(ZoneId.systemDefault());
    private static final Logger log = LoggerFactory.getLogger(ConsumerHandler.class);

    public ConsumerHandler(WebSocketService service, HttpServletRequest request) {
        super(service, request);
        this.subscription = ConsumerHandler.extractSubscription(request);
        this.conf = this.getConsumerConfiguration();
        this.maxPendingMessages = this.conf.getReceiverQueueSize() == 0 ? 1 : this.conf.getReceiverQueueSize();
        this.numMsgsDelivered = new LongAdder();
        this.numBytesDelivered = new LongAdder();
        this.numMsgsAcked = new LongAdder();
    }

    @Override
    protected void createClient(Session session) {
        try {
            this.consumer = this.service.getPulsarClient().subscribe(this.topic, this.subscription, this.conf);
            this.service.addConsumer(this);
            this.receiveMessage();
        }
        catch (Exception e) {
            log.warn("[{}] Failed in creating subscription {} on topic {}", new Object[]{session.getRemoteAddress(), this.subscription, this.topic, e});
            this.close(WebSocketError.FailedToSubscribe, e.getMessage());
        }
    }

    private void receiveMessage() {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Receive next message", new Object[]{this.getSession().getRemoteAddress(), this.topic, this.subscription});
        }
        ((CompletableFuture)this.consumer.receiveAsync().thenAccept(msg -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Got message {}", new Object[]{this.getSession().getRemoteAddress(), this.topic, this.subscription, msg.getMessageId()});
            }
            ConsumerMessage dm = new ConsumerMessage();
            dm.messageId = Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray());
            dm.payload = Base64.getEncoder().encodeToString(msg.getData());
            dm.properties = msg.getProperties();
            dm.publishTime = DATE_FORMAT.format(Instant.ofEpochMilli(msg.getPublishTime()));
            if (msg.hasKey()) {
                dm.key = msg.getKey();
            }
            final long msgSize = msg.getData().length;
            try {
                this.getSession().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)dm), new WriteCallback(){

                    public void writeFailed(Throwable th) {
                        log.warn("[{}/{}] Failed to deliver msg to {} {}", new Object[]{ConsumerHandler.this.consumer.getTopic(), ConsumerHandler.this.subscription, ConsumerHandler.this.getRemote().getInetSocketAddress().toString(), th.getMessage()});
                        ConsumerHandler.this.pendingMessages.decrementAndGet();
                        ConsumerHandler.this.service.getExecutor().execute(() -> ConsumerHandler.this.receiveMessage());
                    }

                    public void writeSuccess() {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}/{}] message is delivered successfully to {} ", new Object[]{ConsumerHandler.this.consumer.getTopic(), ConsumerHandler.this.subscription, ConsumerHandler.this.getRemote().getInetSocketAddress().toString()});
                        }
                        ConsumerHandler.this.updateDeliverMsgStat(msgSize);
                    }
                });
            }
            catch (JsonProcessingException e) {
                this.close(WebSocketError.FailedToSerializeToJSON);
            }
            int pending = this.pendingMessages.incrementAndGet();
            if (pending < this.maxPendingMessages) {
                this.service.getExecutor().execute(() -> this.receiveMessage());
            }
        })).exceptionally(exception -> null);
    }

    public void onWebSocketText(String message) {
        MessageId msgId;
        super.onWebSocketText(message);
        try {
            ConsumerAck ack = (ConsumerAck)ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class);
            msgId = MessageId.fromByteArray((byte[])Base64.getDecoder().decode(ack.messageId));
        }
        catch (IOException e) {
            log.warn("Failed to deserialize message id: {}", (Object)message, (Object)e);
            this.close(WebSocketError.FailedToDeserializeFromJSON);
            return;
        }
        this.consumer.acknowledgeAsync(msgId).thenAccept(consumer -> this.numMsgsAcked.increment());
        int pending = this.pendingMessages.getAndDecrement();
        if (pending >= this.maxPendingMessages) {
            this.receiveMessage();
        }
    }

    @Override
    public void close() throws IOException {
        if (this.consumer != null) {
            this.service.removeConsumer(this);
            ((CompletableFuture)this.consumer.closeAsync().thenAccept(x -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed consumer asynchronously", (Object)this.consumer.getTopic());
                }
            })).exceptionally(exception -> {
                log.warn("[{}] Failed to close consumer", (Object)this.consumer.getTopic(), exception);
                return null;
            });
        }
    }

    public Consumer getConsumer() {
        return this.consumer;
    }

    public String getSubscription() {
        return this.subscription;
    }

    public SubscriptionType getSubscriptionType() {
        return this.conf.getSubscriptionType();
    }

    public long getAndResetNumMsgsDelivered() {
        return this.numMsgsDelivered.sumThenReset();
    }

    public long getAndResetNumBytesDelivered() {
        return this.numBytesDelivered.sumThenReset();
    }

    public long getAndResetNumMsgsAcked() {
        return this.numMsgsAcked.sumThenReset();
    }

    public long getMsgDeliveredCounter() {
        return MSG_DELIVERED_COUNTER_UPDATER.get(this);
    }

    protected void updateDeliverMsgStat(long msgSize) {
        this.numMsgsDelivered.increment();
        MSG_DELIVERED_COUNTER_UPDATER.incrementAndGet(this);
        this.numBytesDelivered.add(msgSize);
    }

    private ConsumerConfiguration getConsumerConfiguration() {
        ConsumerConfiguration conf = new ConsumerConfiguration();
        if (this.queryParams.containsKey("ackTimeoutMillis")) {
            conf.setAckTimeout((long)Integer.parseInt((String)this.queryParams.get("ackTimeoutMillis")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("subscriptionType")) {
            conf.setSubscriptionType(SubscriptionType.valueOf((String)((String)this.queryParams.get("subscriptionType"))));
        }
        if (this.queryParams.containsKey("receiverQueueSize")) {
            conf.setReceiverQueueSize(Math.min(Integer.parseInt((String)this.queryParams.get("receiverQueueSize")), 1000));
        }
        if (this.queryParams.containsKey("consumerName")) {
            conf.setConsumerName((String)this.queryParams.get("consumerName"));
        }
        return conf;
    }

    @Override
    protected Boolean isAuthorized(String authRole) throws Exception {
        return this.service.getAuthorizationManager().canConsume(DestinationName.get((String)this.topic), authRole);
    }

    private static String extractSubscription(HttpServletRequest request) {
        String uri = request.getRequestURI();
        List parts = Splitter.on((String)"/").splitToList((CharSequence)uri);
        Preconditions.checkArgument((parts.size() == 9 ? 1 : 0) != 0, (Object)"Invalid topic name format");
        Preconditions.checkArgument((boolean)((String)parts.get(1)).equals("ws"));
        Preconditions.checkArgument((boolean)((String)parts.get(3)).equals("persistent"));
        Preconditions.checkArgument((((String)parts.get(8)).length() > 0 ? 1 : 0) != 0, (Object)"Empty subscription name");
        return (String)parts.get(8);
    }
}

