/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Enums;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.AbstractWebSocketHandler;
import org.apache.pulsar.websocket.WebSocketError;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.stats.StatsBuckets;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerHandler
extends AbstractWebSocketHandler {
    private Producer<byte[]> producer;
    private final LongAdder numMsgsSent = new LongAdder();
    private final LongAdder numMsgsFailed;
    private final LongAdder numBytesSent = new LongAdder();
    private final StatsBuckets publishLatencyStatsUSec;
    private volatile long msgPublishedCounter = 0L;
    private static final AtomicLongFieldUpdater<ProducerHandler> MSG_PUBLISHED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ProducerHandler.class, "msgPublishedCounter");
    public static final long[] ENTRY_LATENCY_BUCKETS_USEC = new long[]{500L, 1000L, 5000L, 10000L, 20000L, 50000L, 100000L, 200000L, 1000000L};
    private static final Logger log = LoggerFactory.getLogger(ProducerHandler.class);

    public ProducerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
        super(service, request, response);
        this.numMsgsFailed = new LongAdder();
        this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
        if (!this.checkAuth(response)) {
            return;
        }
        try {
            this.producer = this.getProducerBuilder(service.getPulsarClient()).topic(this.topic.toString()).create();
            if (!this.service.addProducer(this)) {
                log.warn("[{}:{}] Failed to add producer handler for topic {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), this.topic});
            }
        }
        catch (Exception e) {
            log.warn("[{}:{}] Failed in creating producer on topic {}: {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), this.topic, e.getMessage()});
            try {
                response.sendError(ProducerHandler.getErrorCode(e), ProducerHandler.getErrorMessage(e));
            }
            catch (IOException e1) {
                log.warn("[{}:{}] Failed to send error: {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), e1.getMessage(), e1});
            }
        }
    }

    private static int getErrorCode(Exception e) {
        if (e instanceof IllegalArgumentException) {
            return 400;
        }
        if (e instanceof PulsarClientException.ProducerBusyException) {
            return 409;
        }
        if (e instanceof PulsarClientException.ProducerBlockedQuotaExceededError || e instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
            return 503;
        }
        return 500;
    }

    private static String getErrorMessage(Exception e) {
        if (e instanceof IllegalArgumentException) {
            return "Invalid query params: " + e.getMessage();
        }
        return "Failed to create producer: " + e.getMessage();
    }

    @Override
    public void close() throws IOException {
        if (this.producer != null) {
            if (!this.service.removeProducer(this)) {
                log.warn("[{}] Failed to remove producer handler", (Object)this.producer.getTopic());
            }
            ((CompletableFuture)this.producer.closeAsync().thenAccept(x -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed producer asynchronously", (Object)this.producer.getTopic());
                }
            })).exceptionally(exception -> {
                log.warn("[{}] Failed to close producer", (Object)this.producer.getTopic(), exception);
                return null;
            });
        }
    }

    public void onWebSocketText(String message) {
        ProducerMessage sendRequest;
        byte[] rawPayload = null;
        String requestContext = null;
        try {
            sendRequest = (ProducerMessage)ObjectMapperFactory.getThreadLocal().readValue(message, ProducerMessage.class);
            requestContext = sendRequest.context;
            rawPayload = Base64.getDecoder().decode(sendRequest.payload);
        }
        catch (IOException e) {
            this.sendAckResponse(new ProducerAck(WebSocketError.FailedToDeserializeFromJSON, e.getMessage(), null, null));
            return;
        }
        catch (IllegalArgumentException e) {
            String msg = String.format("Invalid Base64 message-payload error=%s", e.getMessage());
            this.sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, msg, null, requestContext));
            return;
        }
        long msgSize = rawPayload.length;
        TypedMessageBuilder builder = this.producer.newMessage();
        try {
            builder.value((Object)rawPayload);
        }
        catch (SchemaSerializationException e) {
            this.sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e.getMessage(), null, requestContext));
            return;
        }
        if (sendRequest.properties != null) {
            builder.properties(sendRequest.properties);
        }
        if (sendRequest.key != null) {
            builder.key(sendRequest.key);
        }
        if (sendRequest.replicationClusters != null) {
            builder.replicationClusters(sendRequest.replicationClusters);
        }
        long now = System.nanoTime();
        ((CompletableFuture)builder.sendAsync().thenAccept(msgId -> {
            this.updateSentMsgStats(msgSize, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - now));
            if (this.isConnected()) {
                String messageId = Base64.getEncoder().encodeToString(msgId.toByteArray());
                this.sendAckResponse(new ProducerAck(messageId, sendRequest.context));
            }
        })).exceptionally(exception -> {
            log.warn("[{}] Error occurred while producer handler was sending msg from {}: {}", new Object[]{this.producer.getTopic(), this.getRemote().getInetSocketAddress().toString(), exception.getMessage()});
            this.numMsgsFailed.increment();
            this.sendAckResponse(new ProducerAck(WebSocketError.UnknownError, exception.getMessage(), null, sendRequest.context));
            return null;
        });
    }

    public Producer<byte[]> getProducer() {
        return this.producer;
    }

    public long getAndResetNumMsgsSent() {
        return this.numMsgsSent.sumThenReset();
    }

    public long getAndResetNumBytesSent() {
        return this.numBytesSent.sumThenReset();
    }

    public long getAndResetNumMsgsFailed() {
        return this.numMsgsFailed.sumThenReset();
    }

    public long[] getAndResetPublishLatencyStatsUSec() {
        this.publishLatencyStatsUSec.refresh();
        return this.publishLatencyStatsUSec.getBuckets();
    }

    public StatsBuckets getPublishLatencyStatsUSec() {
        return this.publishLatencyStatsUSec;
    }

    public long getMsgPublishedCounter() {
        return this.msgPublishedCounter;
    }

    @Override
    protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
        return this.service.getAuthorizationService().canProduce(this.topic, authRole, authenticationData);
    }

    private void sendAckResponse(ProducerAck response) {
        try {
            String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)response);
            this.getSession().getRemote().sendString(msg);
        }
        catch (JsonProcessingException e) {
            log.warn("[{}] Failed to generate ack json-response {}", new Object[]{this.producer.getTopic(), e.getMessage(), e});
        }
        catch (Exception e) {
            log.warn("[{}] Failed to send ack {}", new Object[]{this.producer.getTopic(), e.getMessage(), e});
        }
    }

    private void updateSentMsgStats(long msgSize, long latencyUsec) {
        this.publishLatencyStatsUSec.addValue(latencyUsec);
        this.numBytesSent.add(msgSize);
        this.numMsgsSent.increment();
        MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
    }

    private ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
        ProducerBuilder builder = client.newProducer().enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition);
        builder.blockIfQueueFull(false);
        if (this.queryParams.containsKey("producerName")) {
            builder.producerName((String)this.queryParams.get("producerName"));
        }
        if (this.queryParams.containsKey("initialSequenceId")) {
            builder.initialSequenceId(Long.parseLong("initialSequenceId"));
        }
        if (this.queryParams.containsKey("hashingScheme")) {
            builder.hashingScheme(HashingScheme.valueOf((String)((String)this.queryParams.get("hashingScheme"))));
        }
        if (this.queryParams.containsKey("sendTimeoutMillis")) {
            builder.sendTimeout(Integer.parseInt((String)this.queryParams.get("sendTimeoutMillis")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("batchingEnabled")) {
            builder.enableBatching(Boolean.parseBoolean((String)this.queryParams.get("batchingEnabled")));
        }
        if (this.queryParams.containsKey("batchingMaxMessages")) {
            builder.batchingMaxMessages(Integer.parseInt((String)this.queryParams.get("batchingMaxMessages")));
        }
        if (this.queryParams.containsKey("maxPendingMessages")) {
            builder.maxPendingMessages(Integer.parseInt((String)this.queryParams.get("maxPendingMessages")));
        }
        if (this.queryParams.containsKey("batchingMaxPublishDelay")) {
            builder.batchingMaxPublishDelay((long)Integer.parseInt((String)this.queryParams.get("batchingMaxPublishDelay")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("messageRoutingMode")) {
            Preconditions.checkArgument((boolean)Enums.getIfPresent(MessageRoutingMode.class, (String)((String)this.queryParams.get("messageRoutingMode"))).isPresent(), (String)"Invalid messageRoutingMode %s", this.queryParams.get("messageRoutingMode"));
            MessageRoutingMode routingMode = MessageRoutingMode.valueOf((String)((String)this.queryParams.get("messageRoutingMode")));
            if (!MessageRoutingMode.CustomPartition.equals((Object)routingMode)) {
                builder.messageRoutingMode(routingMode);
            }
        }
        if (this.queryParams.containsKey("compressionType")) {
            Preconditions.checkArgument((boolean)Enums.getIfPresent(CompressionType.class, (String)((String)this.queryParams.get("compressionType"))).isPresent(), (String)"Invalid compressionType %s", this.queryParams.get("compressionType"));
            builder.compressionType(CompressionType.valueOf((String)((String)this.queryParams.get("compressionType"))));
        }
        return builder;
    }
}

