package org.apache.pulsar.websocket;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.common.base.Enums;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.DummyCryptoKeyReaderImpl;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.service.WSSDummyMessageCryptoImpl;
import org.apache.pulsar.websocket.stats.StatsBuckets;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/websocket/ProducerHandler.class */
public class ProducerHandler extends AbstractWebSocketHandler {
    private WebSocketService service;
    private Producer<byte[]> producer;
    private final LongAdder numMsgsSent;
    private final LongAdder numMsgsFailed;
    private final LongAdder numBytesSent;
    private final StatsBuckets publishLatencyStatsUSec;
    private volatile long msgPublishedCounter;
    private boolean clientSideEncrypt;
    private final ObjectReader producerMessageReader;
    private static final AtomicLongFieldUpdater<ProducerHandler> MSG_PUBLISHED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ProducerHandler.class, "msgPublishedCounter");
    public static final List<Long> ENTRY_LATENCY_BUCKETS_USEC = Collections.unmodifiableList(Arrays.asList(500L, 1000L, 5000L, 10000L, 20000L, 50000L, 100000L, 200000L, 1000000L));
    private static final Logger log = LoggerFactory.getLogger(ProducerHandler.class);

    public ProducerHandler(WebSocketService webSocketService, HttpServletRequest httpServletRequest, ServletUpgradeResponse servletUpgradeResponse) {
        super(webSocketService, httpServletRequest, servletUpgradeResponse);
        this.msgPublishedCounter = 0L;
        this.producerMessageReader = ObjectMapperFactory.getMapper().reader().forType(ProducerMessage.class);
        this.numMsgsSent = new LongAdder();
        this.numBytesSent = new LongAdder();
        this.numMsgsFailed = new LongAdder();
        this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
        this.service = webSocketService;
        if (checkAuth(servletUpgradeResponse)) {
            try {
                this.producer = getProducerBuilder(webSocketService.getPulsarClient()).topic(this.topic.toString()).create();
                if (this.clientSideEncrypt) {
                    log.info("[{}] [{}] The producer session is created with param encryptionKeyValues, which means that message encryption will be done on the client side, then the server will skip batch message processing, message compression processing, and message encryption processing", this.producer.getTopic(), this.producer.getProducerName());
                }
                if (!this.service.addProducer(this)) {
                    log.warn("[{}:{}] Failed to add producer handler for topic {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.topic});
                }
            } catch (Exception e) {
                log.warn("[{}:{}] Failed in creating producer on topic {}: {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.topic, e.getMessage()});
                try {
                    servletUpgradeResponse.sendError(getErrorCode(e), getErrorMessage(e));
                } catch (IOException e2) {
                    log.warn("[{}:{}] Failed to send error: {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), e2.getMessage(), e2});
                }
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.producer != null) {
            if (!this.service.removeProducer(this)) {
                log.warn("[{}] Failed to remove producer handler", this.producer.getTopic());
            }
            this.producer.closeAsync().thenAccept(r5 -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed producer asynchronously", this.producer.getTopic());
                }
            }).exceptionally(th -> {
                log.warn("[{}] Failed to close producer", this.producer.getTopic(), th);
                return null;
            });
        }
    }

    public void onWebSocketText(String str) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received new message from producer {} ", this.producer.getTopic(), getRemote().getInetSocketAddress().toString());
        }
        String str2 = null;
        try {
            ProducerMessage producerMessage = (ProducerMessage) this.producerMessageReader.readValue(str);
            str2 = producerMessage.context;
            byte[] decode = Base64.getDecoder().decode(producerMessage.payload);
            long length = decode.length;
            TypedMessageBuilderImpl newMessage = this.producer.newMessage();
            try {
                newMessage.value(decode);
                if (producerMessage.properties != null) {
                    newMessage.properties(producerMessage.properties);
                }
                if (producerMessage.key != null) {
                    newMessage.key(producerMessage.key);
                }
                if (producerMessage.replicationClusters != null) {
                    newMessage.replicationClusters(producerMessage.replicationClusters);
                }
                if (producerMessage.eventTime != null) {
                    try {
                        newMessage.eventTime(DateFormatter.parse(producerMessage.eventTime));
                    } catch (DateTimeParseException e) {
                        sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e.getMessage(), null, str2));
                        return;
                    }
                }
                if (producerMessage.deliverAt > 0) {
                    newMessage.deliverAt(producerMessage.deliverAt);
                }
                if (producerMessage.deliverAfterMs > 0) {
                    newMessage.deliverAfter(producerMessage.deliverAfterMs, TimeUnit.MILLISECONDS);
                }
                if (this.clientSideEncrypt) {
                    try {
                        if (!StringUtils.isBlank(producerMessage.encryptionParam)) {
                            newMessage.getMetadataBuilder().setEncryptionParam(Base64.getDecoder().decode(producerMessage.encryptionParam));
                        }
                        if (producerMessage.compressionType != null && producerMessage.uncompressedMessageSize != null) {
                            newMessage.getMetadataBuilder().setCompression(producerMessage.compressionType);
                            newMessage.getMetadataBuilder().setUncompressedSize(producerMessage.uncompressedMessageSize.intValue());
                        } else if ((!CompressionType.NONE.equals(producerMessage.compressionType) && producerMessage.compressionType != null) || producerMessage.uncompressedMessageSize != null) {
                            sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, "the params compressionType and uncompressedMessageSize should both empty or both non-empty", null, str2));
                            return;
                        }
                    } catch (Exception e2) {
                        sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, String.format("Invalid Base64 encryptionParam error=%s", e2.getMessage()), null, str2));
                        return;
                    }
                }
                long nanoTime = System.nanoTime();
                newMessage.sendAsync().thenAccept(messageId -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Success fully write the message to broker with returned message ID {} from producer {}", new Object[]{this.producer.getTopic(), messageId, getRemote().getInetSocketAddress().toString()});
                    }
                    updateSentMsgStats(length, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - nanoTime));
                    if (isConnected()) {
                        sendAckResponse(new ProducerAck(Base64.getEncoder().encodeToString(messageId.toByteArray()), producerMessage.context));
                    }
                }).exceptionally(th -> {
                    log.warn("[{}] Error occurred while producer handler was sending msg from {}", new Object[]{this.producer.getTopic(), getRemote().getInetSocketAddress().toString(), th});
                    this.numMsgsFailed.increment();
                    sendAckResponse(new ProducerAck(WebSocketError.UnknownError, th.getMessage(), null, producerMessage.context));
                    return null;
                });
            } catch (SchemaSerializationException e3) {
                sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e3.getMessage(), null, str2));
            }
        } catch (IOException e4) {
            sendAckResponse(new ProducerAck(WebSocketError.FailedToDeserializeFromJSON, e4.getMessage(), null, null));
        } catch (IllegalArgumentException e5) {
            sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, String.format("Invalid Base64 message-payload error=%s", e5.getMessage()), null, str2));
        } catch (NullPointerException e6) {
            sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, e6.getMessage(), null, str2));
        }
    }

    public Producer<byte[]> getProducer() {
        return this.producer;
    }

    public long getAndResetNumMsgsSent() {
        return this.numMsgsSent.sumThenReset();
    }

    public long getAndResetNumBytesSent() {
        return this.numBytesSent.sumThenReset();
    }

    public long getAndResetNumMsgsFailed() {
        return this.numMsgsFailed.sumThenReset();
    }

    public long[] getAndResetPublishLatencyStatsUSec() {
        this.publishLatencyStatsUSec.refresh();
        return this.publishLatencyStatsUSec.getBuckets();
    }

    public StatsBuckets getPublishLatencyStatsUSec() {
        return this.publishLatencyStatsUSec;
    }

    public long getMsgPublishedCounter() {
        return this.msgPublishedCounter;
    }

    @Override // org.apache.pulsar.websocket.AbstractWebSocketHandler
    protected Boolean isAuthorized(String str, AuthenticationDataSource authenticationDataSource) throws Exception {
        try {
            return (Boolean) this.service.getAuthorizationService().allowTopicOperationAsync(this.topic, TopicOperation.PRODUCE, str, authenticationDataSource).get(this.service.getConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            log.warn("Time-out {} sec while checking authorization on {} ", Integer.valueOf(this.service.getConfig().getMetadataStoreOperationTimeoutSeconds()), this.topic);
            throw e;
        } catch (Exception e2) {
            log.warn("Producer-client  with Role - {} failed to get permissions for topic - {}. {}", new Object[]{str, this.topic, e2.getMessage()});
            throw e2;
        }
    }

    private void sendAckResponse(ProducerAck producerAck) {
        try {
            getSession().getRemote().sendString(objectWriter().writeValueAsString(producerAck), new WriteCallback() { // from class: org.apache.pulsar.websocket.ProducerHandler.1
                public void writeFailed(Throwable th) {
                    ProducerHandler.log.warn("[{}] Failed to send ack: {}", ProducerHandler.this.producer.getTopic(), th.getMessage());
                }

                public void writeSuccess() {
                    if (ProducerHandler.log.isDebugEnabled()) {
                        ProducerHandler.log.debug("[{}] Ack was sent successfully to {}", ProducerHandler.this.producer.getTopic(), ProducerHandler.this.getRemote().getInetSocketAddress().toString());
                    }
                }
            });
        } catch (Exception e) {
            log.warn("[{}] Failed to send ack: {}", this.producer.getTopic(), e.getMessage());
        } catch (JsonProcessingException e2) {
            log.warn("[{}] Failed to generate ack json-response: {}", this.producer.getTopic(), e2.getMessage());
        }
    }

    private void updateSentMsgStats(long j, long j2) {
        this.publishLatencyStatsUSec.addValue(j2);
        this.numBytesSent.add(j);
        this.numMsgsSent.increment();
        MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
    }

    protected ProducerBuilder<byte[]> getProducerBuilder(PulsarClient pulsarClient) {
        ProducerBuilder<byte[]> messageRoutingMode = pulsarClient.newProducer().enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition);
        messageRoutingMode.blockIfQueueFull(false);
        if (this.queryParams.containsKey("producerName")) {
            messageRoutingMode.producerName(this.queryParams.get("producerName"));
        }
        if (this.queryParams.containsKey("initialSequenceId")) {
            messageRoutingMode.initialSequenceId(Long.parseLong(this.queryParams.get("initialSequenceId")));
        }
        if (this.queryParams.containsKey("hashingScheme")) {
            messageRoutingMode.hashingScheme(HashingScheme.valueOf(this.queryParams.get("hashingScheme")));
        }
        if (this.queryParams.containsKey("sendTimeoutMillis")) {
            messageRoutingMode.sendTimeout(Integer.parseInt(this.queryParams.get("sendTimeoutMillis")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("messageRoutingMode")) {
            Preconditions.checkArgument(Enums.getIfPresent(MessageRoutingMode.class, this.queryParams.get("messageRoutingMode")).isPresent(), "Invalid messageRoutingMode %s", this.queryParams.get("messageRoutingMode"));
            MessageRoutingMode valueOf = MessageRoutingMode.valueOf(this.queryParams.get("messageRoutingMode"));
            if (!MessageRoutingMode.CustomPartition.equals(valueOf)) {
                messageRoutingMode.messageRoutingMode(valueOf);
            }
        }
        Map<String, EncryptionContext.EncryptionKey> tryToExtractJsonEncryptionKeys = tryToExtractJsonEncryptionKeys();
        if (tryToExtractJsonEncryptionKeys != null) {
            popularProducerBuilderForClientSideEncrypt(messageRoutingMode, tryToExtractJsonEncryptionKeys);
        } else {
            popularProducerBuilderForServerSideEncrypt(messageRoutingMode);
        }
        return messageRoutingMode;
    }

    private Map<String, EncryptionContext.EncryptionKey> tryToExtractJsonEncryptionKeys() {
        if (!this.queryParams.containsKey("encryptionKeys")) {
            return null;
        }
        try {
            try {
                Map<String, EncryptionContext.EncryptionKey> map = (Map) ObjectMapperFactory.getMapper().getObjectMapper().readValue(Base64.getDecoder().decode(StringUtils.trim(this.queryParams.get("encryptionKeys"))), new TypeReference<Map<String, EncryptionContext.EncryptionKey>>() { // from class: org.apache.pulsar.websocket.ProducerHandler.2
                });
                if (map.isEmpty()) {
                    return null;
                }
                if (map.values().iterator().next().getKeyValue() == null) {
                    return null;
                }
                return map;
            } catch (IOException e) {
                return null;
            }
        } catch (Exception e2) {
            return null;
        }
    }

    /* JADX WARN: Type inference failed for: r0v6, types: [byte[], byte[][]] */
    private void popularProducerBuilderForClientSideEncrypt(ProducerBuilder<byte[]> producerBuilder, Map<String, EncryptionContext.EncryptionKey> map) {
        this.clientSideEncrypt = true;
        int size = map.size();
        String[] strArr = new String[size];
        ?? r0 = new byte[size];
        List[] listArr = new List[size];
        for (Map.Entry<String, EncryptionContext.EncryptionKey> entry : map.entrySet()) {
            Preconditions.checkArgument(StringUtils.isNotBlank(entry.getKey()), "Empty param encryptionKeys.key");
            Preconditions.checkArgument(entry.getValue() != null, "Empty param encryptionKeys.value");
            Preconditions.checkArgument(entry.getValue().getKeyValue() != null, "Empty param encryptionKeys.value.keyValue");
            strArr[0] = StringUtils.trim(entry.getKey());
            r0[0] = entry.getValue().getKeyValue();
            if (entry.getValue().getMetadata() == null) {
                listArr[0] = Collections.emptyList();
            } else {
                listArr[0] = (List) entry.getValue().getMetadata().entrySet().stream().map(entry2 -> {
                    return new KeyValue().setKey((String) entry2.getKey()).setValue((String) entry2.getValue());
                }).collect(Collectors.toList());
            }
            producerBuilder.addEncryptionKey(strArr[0]);
        }
        producerBuilder.enableBatching(false);
        producerBuilder.compressionType(org.apache.pulsar.client.api.CompressionType.NONE);
        producerBuilder.cryptoKeyReader(DummyCryptoKeyReaderImpl.INSTANCE);
        producerBuilder.enableChunking(false);
        producerBuilder.messageCrypto(new WSSDummyMessageCryptoImpl(messageMetadata -> {
            for (int i = 0; i < strArr.length; i++) {
                messageMetadata.addEncryptionKey().setKey(strArr[i]).setValue(r0[i]).addAllMetadatas(listArr[i]);
            }
        }));
        printLogIfSettingDiscardedBatchedParams();
        printLogIfSettingDiscardedCompressionParams();
    }

    private void popularProducerBuilderForServerSideEncrypt(ProducerBuilder<byte[]> producerBuilder) {
        this.clientSideEncrypt = false;
        if (this.queryParams.containsKey("batchingEnabled")) {
            if (Boolean.parseBoolean(this.queryParams.get("batchingEnabled"))) {
                producerBuilder.enableBatching(true);
                if (this.queryParams.containsKey("batchingMaxMessages")) {
                    producerBuilder.batchingMaxMessages(Integer.parseInt(this.queryParams.get("batchingMaxMessages")));
                }
                if (this.queryParams.containsKey("maxPendingMessages")) {
                    producerBuilder.maxPendingMessages(Integer.parseInt(this.queryParams.get("maxPendingMessages")));
                }
                if (this.queryParams.containsKey("batchingMaxPublishDelay")) {
                    producerBuilder.batchingMaxPublishDelay(Integer.parseInt(this.queryParams.get("batchingMaxPublishDelay")), TimeUnit.MILLISECONDS);
                }
            } else {
                producerBuilder.enableBatching(false);
                printLogIfSettingDiscardedBatchedParams();
            }
        }
        if (this.queryParams.containsKey("compressionType")) {
            Preconditions.checkArgument(Enums.getIfPresent(org.apache.pulsar.client.api.CompressionType.class, this.queryParams.get("compressionType")).isPresent(), "Invalid compressionType %s", this.queryParams.get("compressionType"));
            producerBuilder.compressionType(org.apache.pulsar.client.api.CompressionType.valueOf(this.queryParams.get("compressionType")));
        }
        if (this.queryParams.containsKey("encryptionKeys")) {
            producerBuilder.cryptoKeyReader(this.service.getCryptoKeyReader().orElseThrow(() -> {
                return new IllegalStateException("Can't add encryption key without configuring cryptoKeyReaderFactoryClassName");
            }));
            for (String str : this.queryParams.get("encryptionKeys").split(",")) {
                producerBuilder.addEncryptionKey(str);
            }
        }
    }

    private void printLogIfSettingDiscardedBatchedParams() {
        if (this.clientSideEncrypt && this.queryParams.containsKey("batchingEnabled")) {
            log.info("Since clientSideEncrypt is true, the param batchingEnabled of producer will be ignored");
        }
        if (this.queryParams.containsKey("batchingMaxMessages")) {
            log.info("Since batchingEnabled is false, the param batchingMaxMessages of producer will be ignored");
        }
        if (this.queryParams.containsKey("maxPendingMessages")) {
            log.info("Since batchingEnabled is false, the param maxPendingMessages of producer will be ignored");
        }
        if (this.queryParams.containsKey("batchingMaxPublishDelay")) {
            log.info("Since batchingEnabled is false, the param batchingMaxPublishDelay of producer will be ignored");
        }
    }

    private void printLogIfSettingDiscardedCompressionParams() {
        if (this.clientSideEncrypt && this.queryParams.containsKey("compressionType")) {
            log.info("Since clientSideEncrypt is true, the param compressionType of producer will be ignored");
        }
    }
}
