package org.apache.pulsar.websocket;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Enums;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Base64;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.stats.StatsBuckets;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/websocket/ProducerHandler.class */
public class ProducerHandler extends AbstractWebSocketHandler {
    private Producer producer;
    private final LongAdder numMsgsSent;
    private final LongAdder numMsgsFailed;
    private final LongAdder numBytesSent;
    private final StatsBuckets publishLatencyStatsUSec;
    private volatile long msgPublishedCounter;
    private static final AtomicLongFieldUpdater<ProducerHandler> MSG_PUBLISHED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ProducerHandler.class, "msgPublishedCounter");
    public static final long[] ENTRY_LATENCY_BUCKETS_USEC = {500, 1000, 5000, 10000, 20000, 50000, 100000, 200000, 1000000};
    private static final Logger log = LoggerFactory.getLogger(ProducerHandler.class);

    public ProducerHandler(WebSocketService webSocketService, HttpServletRequest httpServletRequest, ServletUpgradeResponse servletUpgradeResponse) {
        super(webSocketService, httpServletRequest, servletUpgradeResponse);
        this.msgPublishedCounter = 0L;
        this.numMsgsSent = new LongAdder();
        this.numBytesSent = new LongAdder();
        this.numMsgsFailed = new LongAdder();
        this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
        try {
            this.producer = webSocketService.getPulsarClient().createProducer(this.topic, getProducerConfiguration());
            if (!this.service.addProducer(this)) {
                log.warn("[{}:{}] Failed to add producer handler for topic {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.topic});
            }
        } catch (Exception e) {
            log.warn("[{}:{}] Failed in creating producer on topic {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.topic, e});
            boolean z = e instanceof IllegalArgumentException;
            try {
                servletUpgradeResponse.sendError(z ? 400 : 500, z ? "Invalid query-param " + e.getMessage() : "Failed to create producer");
            } catch (IOException e2) {
                log.warn("[{}:{}] Failed to send error: {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), e2.getMessage(), e2});
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.producer != null) {
            if (!this.service.removeProducer(this)) {
                log.warn("[{}] Failed to remove producer handler", this.producer.getTopic());
            }
            this.producer.closeAsync().thenAccept(r5 -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed producer asynchronously", this.producer.getTopic());
                }
            }).exceptionally(th -> {
                log.warn("[{}] Failed to close producer", this.producer.getTopic(), th);
                return null;
            });
        }
    }

    public void onWebSocketText(String str) {
        String str2 = null;
        try {
            ProducerMessage producerMessage = (ProducerMessage) ObjectMapperFactory.getThreadLocal().readValue(str, ProducerMessage.class);
            str2 = producerMessage.context;
            byte[] decode = Base64.getDecoder().decode(producerMessage.payload);
            long length = decode.length;
            MessageBuilder content = MessageBuilder.create().setContent(decode);
            if (producerMessage.properties != null) {
                content.setProperties(producerMessage.properties);
            }
            if (producerMessage.key != null) {
                content.setKey(producerMessage.key);
            }
            if (producerMessage.replicationClusters != null) {
                content.setReplicationClusters(producerMessage.replicationClusters);
            }
            Message build = content.build();
            long nanoTime = System.nanoTime();
            this.producer.sendAsync(build).thenAccept(messageId -> {
                updateSentMsgStats(length, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - nanoTime));
                if (isConnected()) {
                    sendAckResponse(new ProducerAck(Base64.getEncoder().encodeToString(messageId.toByteArray()), producerMessage.context));
                }
            }).exceptionally(th -> {
                this.numMsgsFailed.increment();
                sendAckResponse(new ProducerAck(WebSocketError.UnknownError, th.getMessage(), null, producerMessage.context));
                return null;
            });
        } catch (IOException e) {
            sendAckResponse(new ProducerAck(WebSocketError.FailedToDeserializeFromJSON, e.getMessage(), null, null));
        } catch (IllegalArgumentException e2) {
            sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, String.format("Invalid Base64 message-payload error=%s", e2.getMessage()), null, str2));
        }
    }

    public Producer getProducer() {
        return this.producer;
    }

    public long getAndResetNumMsgsSent() {
        return this.numMsgsSent.sumThenReset();
    }

    public long getAndResetNumBytesSent() {
        return this.numBytesSent.sumThenReset();
    }

    public long getAndResetNumMsgsFailed() {
        return this.numMsgsFailed.sumThenReset();
    }

    public long[] getAndResetPublishLatencyStatsUSec() {
        this.publishLatencyStatsUSec.refresh();
        return this.publishLatencyStatsUSec.getBuckets();
    }

    public StatsBuckets getPublishLatencyStatsUSec() {
        return this.publishLatencyStatsUSec;
    }

    public long getMsgPublishedCounter() {
        return MSG_PUBLISHED_COUNTER_UPDATER.get(this);
    }

    @Override // org.apache.pulsar.websocket.AbstractWebSocketHandler
    protected Boolean isAuthorized(String str) throws Exception {
        return Boolean.valueOf(this.service.getAuthorizationManager().canProduce(DestinationName.get(this.topic), str));
    }

    private void sendAckResponse(ProducerAck producerAck) {
        try {
            getSession().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(producerAck));
        } catch (Exception e) {
            log.warn("[{}] Failed to send ack {}", new Object[]{this.producer.getTopic(), e.getMessage(), e});
        } catch (JsonProcessingException e2) {
            log.warn("[{}] Failed to generate ack json-response {}", new Object[]{this.producer.getTopic(), e2.getMessage(), e2});
        }
    }

    private void updateSentMsgStats(long j, long j2) {
        this.publishLatencyStatsUSec.addValue(j2);
        this.numBytesSent.add(j);
        this.numMsgsSent.increment();
        MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
    }

    private ProducerConfiguration getProducerConfiguration() {
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        producerConfiguration.setBlockIfQueueFull(false);
        if (this.queryParams.containsKey("sendTimeoutMillis")) {
            producerConfiguration.setSendTimeout(Integer.parseInt(this.queryParams.get("sendTimeoutMillis")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("batchingEnabled")) {
            producerConfiguration.setBatchingEnabled(Boolean.parseBoolean(this.queryParams.get("batchingEnabled")));
        }
        if (this.queryParams.containsKey("batchingMaxMessages")) {
            producerConfiguration.setBatchingMaxMessages(Integer.parseInt(this.queryParams.get("batchingMaxMessages")));
        }
        if (this.queryParams.containsKey("maxPendingMessages")) {
            producerConfiguration.setMaxPendingMessages(Integer.parseInt(this.queryParams.get("maxPendingMessages")));
        }
        if (this.queryParams.containsKey("batchingMaxPublishDelay")) {
            producerConfiguration.setBatchingMaxPublishDelay(Integer.parseInt(this.queryParams.get("batchingMaxPublishDelay")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("messageRoutingMode")) {
            Preconditions.checkArgument(Enums.getIfPresent(ProducerConfiguration.MessageRoutingMode.class, this.queryParams.get("messageRoutingMode")).isPresent(), "Invalid messageRoutingMode %s", this.queryParams.get("messageRoutingMode"));
            ProducerConfiguration.MessageRoutingMode valueOf = ProducerConfiguration.MessageRoutingMode.valueOf(this.queryParams.get("messageRoutingMode"));
            if (!ProducerConfiguration.MessageRoutingMode.CustomPartition.equals(valueOf)) {
                producerConfiguration.setMessageRoutingMode(valueOf);
            }
        }
        if (this.queryParams.containsKey("compressionType")) {
            Preconditions.checkArgument(Enums.getIfPresent(CompressionType.class, this.queryParams.get("compressionType")).isPresent(), "Invalid compressionType %s", this.queryParams.get("compressionType"));
            producerConfiguration.setCompressionType(CompressionType.valueOf(this.queryParams.get("compressionType")));
        }
        return producerConfiguration;
    }
}
