/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.AbstractWebSocketHandler;
import org.apache.pulsar.websocket.WebSocketError;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.stats.StatsBuckets;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerHandler
extends AbstractWebSocketHandler {
    private Producer producer;
    private final LongAdder numMsgsSent = new LongAdder();
    private final LongAdder numMsgsFailed;
    private final LongAdder numBytesSent = new LongAdder();
    private final StatsBuckets publishLatencyStatsUSec;
    private volatile long msgPublishedCounter = 0L;
    private static final AtomicLongFieldUpdater<ProducerHandler> MSG_PUBLISHED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ProducerHandler.class, "msgPublishedCounter");
    public static final long[] ENTRY_LATENCY_BUCKETS_USEC = new long[]{500L, 1000L, 5000L, 10000L, 20000L, 50000L, 100000L, 200000L, 1000000L};
    private static final Logger log = LoggerFactory.getLogger(ProducerHandler.class);

    public ProducerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
        super(service, request, response);
        this.numMsgsFailed = new LongAdder();
        this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
        try {
            ProducerConfiguration conf = this.getProducerConfiguration();
            this.producer = service.getPulsarClient().createProducer(this.topic, conf);
            if (!this.service.addProducer(this)) {
                log.warn("[{}:{}] Failed to add producer handler for topic {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), this.topic});
            }
        }
        catch (Exception e) {
            log.warn("[{}:{}] Failed in creating producer on topic {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), this.topic, e});
            try {
                response.sendError(500, "Failed to create producer");
            }
            catch (IOException e1) {
                log.warn("[{}:{}] Failed to send error: {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), e1.getMessage(), e1});
            }
        }
    }

    @Override
    public void close() throws IOException {
        if (this.producer != null) {
            if (!this.service.removeProducer(this)) {
                log.warn("[{}] Failed to remove producer handler", (Object)this.producer.getTopic());
            }
            ((CompletableFuture)this.producer.closeAsync().thenAccept(x -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed producer asynchronously", (Object)this.producer.getTopic());
                }
            })).exceptionally(exception -> {
                log.warn("[{}] Failed to close producer", (Object)this.producer.getTopic(), exception);
                return null;
            });
        }
    }

    public void onWebSocketText(String message) {
        ProducerMessage sendRequest;
        byte[] rawPayload = null;
        String requestContext = null;
        try {
            sendRequest = (ProducerMessage)ObjectMapperFactory.getThreadLocal().readValue(message, ProducerMessage.class);
            requestContext = sendRequest.context;
            rawPayload = Base64.getDecoder().decode(sendRequest.payload);
        }
        catch (IOException e) {
            this.sendAckResponse(new ProducerAck(WebSocketError.FailedToDeserializeFromJSON, e.getMessage(), null, null));
            return;
        }
        catch (IllegalArgumentException e) {
            String msg = String.format("Invalid Base64 message-payload error=%s", e.getMessage());
            this.sendAckResponse(new ProducerAck(WebSocketError.PayloadEncodingError, msg, null, requestContext));
            return;
        }
        long msgSize = rawPayload.length;
        MessageBuilder builder = MessageBuilder.create().setContent(rawPayload);
        if (sendRequest.properties != null) {
            builder.setProperties(sendRequest.properties);
        }
        if (sendRequest.key != null) {
            builder.setKey(sendRequest.key);
        }
        if (sendRequest.replicationClusters != null) {
            builder.setReplicationClusters(sendRequest.replicationClusters);
        }
        Message msg = builder.build();
        long now = System.nanoTime();
        ((CompletableFuture)this.producer.sendAsync(msg).thenAccept(msgId -> {
            this.updateSentMsgStats(msgSize, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - now));
            if (this.isConnected()) {
                String messageId = Base64.getEncoder().encodeToString(msgId.toByteArray());
                this.sendAckResponse(new ProducerAck(messageId, sendRequest.context));
            }
        })).exceptionally(exception -> {
            this.numMsgsFailed.increment();
            this.sendAckResponse(new ProducerAck(WebSocketError.UnknownError, exception.getMessage(), null, sendRequest.context));
            return null;
        });
    }

    public Producer getProducer() {
        return this.producer;
    }

    public long getAndResetNumMsgsSent() {
        return this.numMsgsSent.sumThenReset();
    }

    public long getAndResetNumBytesSent() {
        return this.numBytesSent.sumThenReset();
    }

    public long getAndResetNumMsgsFailed() {
        return this.numMsgsFailed.sumThenReset();
    }

    public long[] getAndResetPublishLatencyStatsUSec() {
        this.publishLatencyStatsUSec.refresh();
        return this.publishLatencyStatsUSec.getBuckets();
    }

    public StatsBuckets getPublishLatencyStatsUSec() {
        return this.publishLatencyStatsUSec;
    }

    public long getMsgPublishedCounter() {
        return MSG_PUBLISHED_COUNTER_UPDATER.get(this);
    }

    @Override
    protected Boolean isAuthorized(String authRole) throws Exception {
        return this.service.getAuthorizationManager().canProduce(DestinationName.get((String)this.topic), authRole);
    }

    private void sendAckResponse(ProducerAck response) {
        try {
            String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)response);
            this.getSession().getRemote().sendString(msg);
        }
        catch (JsonProcessingException e) {
            log.warn("[{}] Failed to generate ack json-response {}", new Object[]{this.producer.getTopic(), e.getMessage(), e});
        }
        catch (Exception e) {
            log.warn("[{}] Failed to send ack {}", new Object[]{this.producer.getTopic(), e.getMessage(), e});
        }
    }

    private void updateSentMsgStats(long msgSize, long latencyUsec) {
        this.publishLatencyStatsUSec.addValue(latencyUsec);
        this.numBytesSent.add(msgSize);
        this.numMsgsSent.increment();
        MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
    }

    private ProducerConfiguration getProducerConfiguration() {
        ProducerConfiguration conf = new ProducerConfiguration();
        conf.setBlockIfQueueFull(false);
        if (this.queryParams.containsKey("sendTimeoutMillis")) {
            conf.setSendTimeout(Integer.parseInt((String)this.queryParams.get("sendTimeoutMillis")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("batchingEnabled")) {
            conf.setBatchingEnabled(Boolean.parseBoolean((String)this.queryParams.get("batchingEnabled")));
        }
        if (this.queryParams.containsKey("batchingMaxMessages")) {
            conf.setBatchingMaxMessages(Integer.parseInt((String)this.queryParams.get("batchingMaxMessages")));
        }
        if (this.queryParams.containsKey("maxPendingMessages")) {
            conf.setMaxPendingMessages(Integer.parseInt((String)this.queryParams.get("maxPendingMessages")));
        }
        if (this.queryParams.containsKey("batchingMaxPublishDelay")) {
            conf.setBatchingMaxPublishDelay((long)Integer.parseInt((String)this.queryParams.get("batchingMaxPublishDelay")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("messageRoutingMode")) {
            conf.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.valueOf((String)((String)this.queryParams.get("messageRoutingMode"))));
        }
        if (this.queryParams.containsKey("compressionType")) {
            conf.setCompressionType(CompressionType.valueOf((String)((String)this.queryParams.get("compressionType"))));
        }
        return conf;
    }
}

