/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BatchMessageContainer;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.HandlerBase;
import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerStats;
import org.apache.pulsar.client.impl.ProducerStatsDisabled;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Queues;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.Recycler;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.apache.pulsar.shade.org.apache.pulsar.checksum.utils.Crc32cChecksum;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.DoubleByteBuf;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.shade.org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerImpl
extends ProducerBase
implements TimerTask {
    private final long producerId;
    private volatile long msgIdGenerator;
    private final BlockingQueue<OpSendMsg> pendingMessages;
    private final BlockingQueue<OpSendMsg> pendingCallbacks;
    private final Semaphore semaphore;
    private volatile Timeout sendTimeout = null;
    private long createProducerTimeout;
    private final int maxNumMessagesInBatch;
    private final BatchMessageContainer batchMessageContainer;
    private String producerName;
    private String connectionId;
    private String connectedSince;
    private final int partitionIndex;
    private final ProducerStats stats;
    private final CompressionCodec compressor;
    private volatile long lastSequenceIdPublished;
    private MessageCrypto msgCrypto = null;
    private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater.newUpdater(ProducerImpl.class, "msgIdGenerator");
    private ScheduledExecutorService keyGenExecutor = null;
    TimerTask batchMessageAndSendTask = new TimerTask(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run(Timeout timeout) throws Exception {
            if (timeout.isCancelled()) {
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Batching the messages from the batch container from timer thread", (Object)ProducerImpl.this.topic, (Object)ProducerImpl.this.producerName);
            }
            ProducerImpl producerImpl = ProducerImpl.this;
            synchronized (producerImpl) {
                ProducerImpl.this.batchMessageAndSend();
            }
            ProducerImpl.this.client.timer().newTimeout(this, ProducerImpl.this.conf.getBatchingMaxPublishDelayMs(), TimeUnit.MILLISECONDS);
        }
    };
    private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);

    public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf, CompletableFuture<Producer> producerCreatedFuture, int partitionIndex) {
        super(client, topic, conf, producerCreatedFuture);
        this.producerId = client.newProducerId();
        this.producerName = conf.getProducerName();
        this.partitionIndex = partitionIndex;
        this.pendingMessages = Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
        this.pendingCallbacks = Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
        this.semaphore = new Semaphore(conf.getMaxPendingMessages(), true);
        this.compressor = CompressionCodecProvider.getCompressionCodec(this.convertCompressionType(conf.getCompressionType()));
        if (conf.getInitialSequenceId().isPresent()) {
            long initialSequenceId;
            this.lastSequenceIdPublished = initialSequenceId = conf.getInitialSequenceId().get().longValue();
            this.msgIdGenerator = initialSequenceId + 1L;
        } else {
            this.lastSequenceIdPublished = -1L;
            this.msgIdGenerator = 0L;
        }
        if (conf.isEncryptionEnabled()) {
            String logCtx = "[" + topic + "] [" + this.producerName + "] [" + this.producerId + "]";
            this.msgCrypto = new MessageCrypto(logCtx, true);
            this.keyGenExecutor = Executors.newSingleThreadScheduledExecutor();
            this.keyGenExecutor.scheduleWithFixedDelay(() -> {
                block2: {
                    try {
                        this.msgCrypto.addPublicKeyCipher(conf.getEncryptionKeys(), conf.getCryptoKeyReader());
                    }
                    catch (PulsarClientException.CryptoException e) {
                        if (producerCreatedFuture.isDone()) break block2;
                        log.warn("[{}] [{}] [{}] Failed to add public key cipher.", new Object[]{topic, this.producerName, this.producerId});
                        producerCreatedFuture.completeExceptionally(e);
                    }
                }
            }, 0L, 4L, TimeUnit.HOURS);
        }
        if (conf.getSendTimeoutMs() > 0L) {
            this.sendTimeout = client.timer().newTimeout(this, conf.getSendTimeoutMs(), TimeUnit.MILLISECONDS);
        }
        this.createProducerTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
        if (conf.getBatchingEnabled()) {
            this.maxNumMessagesInBatch = conf.getBatchingMaxMessages();
            this.batchMessageContainer = new BatchMessageContainer(this.maxNumMessagesInBatch, this.convertCompressionType(conf.getCompressionType()), topic, this.producerName);
        } else {
            this.maxNumMessagesInBatch = 1;
            this.batchMessageContainer = null;
        }
        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0L ? new ProducerStats(client, conf, this) : ProducerStats.PRODUCER_STATS_DISABLED;
        this.grabCnx();
    }

    private boolean isBatchMessagingEnabled() {
        return this.conf.getBatchingEnabled();
    }

    @Override
    public long getLastSequenceId() {
        return this.lastSequenceIdPublished;
    }

    @Override
    public CompletableFuture<MessageId> sendAsync(final Message message) {
        final CompletableFuture<MessageId> future = new CompletableFuture<MessageId>();
        this.sendAsync(message, new SendCallback(){
            SendCallback nextCallback = null;
            long createdAt = System.nanoTime();

            @Override
            public CompletableFuture<MessageId> getFuture() {
                return future;
            }

            @Override
            public SendCallback getNextSendCallback() {
                return this.nextCallback;
            }

            @Override
            public void sendComplete(Exception e) {
                if (e != null) {
                    ProducerImpl.this.stats.incrementSendFailed();
                    future.completeExceptionally(e);
                } else {
                    future.complete(message.getMessageId());
                    ProducerImpl.this.stats.incrementNumAcksReceived(System.nanoTime() - this.createdAt);
                }
                while (this.nextCallback != null) {
                    SendCallback sendCallback = this.nextCallback;
                    if (e != null) {
                        ProducerImpl.this.stats.incrementSendFailed();
                        sendCallback.getFuture().completeExceptionally(e);
                    } else {
                        sendCallback.getFuture().complete(message.getMessageId());
                        ProducerImpl.this.stats.incrementNumAcksReceived(System.nanoTime() - this.createdAt);
                    }
                    this.nextCallback = this.nextCallback.getNextSendCallback();
                    Object var2_2 = null;
                }
            }

            @Override
            public void addCallback(SendCallback scb) {
                this.nextCallback = scb;
            }
        });
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendAsync(Message message, SendCallback callback) {
        int compressedSize;
        Preconditions.checkArgument(message instanceof MessageImpl);
        if (!this.isValidProducerState(callback)) {
            return;
        }
        if (!this.canEnqueueRequest(callback)) {
            return;
        }
        MessageImpl msg = (MessageImpl)message;
        PulsarApi.MessageMetadata.Builder msgMetadata = msg.getMessageBuilder();
        ByteBuf payload = msg.getDataBuffer();
        int uncompressedSize = payload.readableBytes();
        ByteBuf compressedPayload = payload;
        if (!this.isBatchMessagingEnabled()) {
            compressedPayload = this.compressor.encode(payload);
            payload.release();
        }
        if ((compressedSize = compressedPayload.readableBytes()) > 5232640) {
            compressedPayload.release();
            String compressedStr = !this.isBatchMessagingEnabled() && this.conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
            callback.sendComplete(new PulsarClientException.InvalidMessageException(String.format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize, 5232640)));
            return;
        }
        if (!msg.isReplicated() && msgMetadata.hasProducerName()) {
            callback.sendComplete(new PulsarClientException.InvalidMessageException("Cannot re-use the same message"));
            compressedPayload.release();
            return;
        }
        try {
            ProducerImpl compressedStr = this;
            synchronized (compressedStr) {
                long sequenceId;
                if (!msgMetadata.hasSequenceId()) {
                    sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
                    msgMetadata.setSequenceId(sequenceId);
                } else {
                    sequenceId = msgMetadata.getSequenceId();
                }
                if (!msgMetadata.hasPublishTime()) {
                    msgMetadata.setPublishTime(System.currentTimeMillis());
                    Preconditions.checkArgument(!msgMetadata.hasProducerName());
                    msgMetadata.setProducerName(this.producerName);
                    if (this.conf.getCompressionType() != CompressionType.NONE) {
                        msgMetadata.setCompression(this.convertCompressionType(this.conf.getCompressionType()));
                        msgMetadata.setUncompressedSize(uncompressedSize);
                    }
                }
                if (this.isBatchMessagingEnabled()) {
                    if (this.batchMessageContainer.hasSpaceInBatch(msg)) {
                        this.batchMessageContainer.add(msg, callback);
                        payload.release();
                        if (this.batchMessageContainer.numMessagesInBatch == this.maxNumMessagesInBatch || this.batchMessageContainer.currentBatchSizeBytes >= 131072L) {
                            this.batchMessageAndSend();
                        }
                    } else {
                        this.doBatchSendAndAdd(msg, callback, payload);
                    }
                } else {
                    ByteBuf encryptedPayload = this.encryptMessage(msgMetadata, compressedPayload);
                    ByteBuf cmd = this.sendMessage(this.producerId, sequenceId, 1, msgMetadata.build(), encryptedPayload);
                    msgMetadata.recycle();
                    OpSendMsg op = OpSendMsg.create(msg, cmd, sequenceId, callback);
                    op.setNumMessagesInBatch(1);
                    op.setBatchSizeByte(encryptedPayload.readableBytes());
                    this.pendingMessages.put(op);
                    ClientCnx cnx = this.cnx();
                    if (this.isConnected()) {
                        cmd.retain();
                        cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
                        this.stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
                    } else if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", new Object[]{this.topic, this.producerName, sequenceId});
                    }
                }
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            this.semaphore.release();
            callback.sendComplete(new PulsarClientException(ie));
        }
        catch (PulsarClientException e) {
            this.semaphore.release();
            callback.sendComplete(e);
        }
        catch (Throwable t) {
            this.semaphore.release();
            callback.sendComplete(new PulsarClientException(t));
        }
    }

    private ByteBuf encryptMessage(PulsarApi.MessageMetadata.Builder msgMetadata, ByteBuf compressedPayload) throws PulsarClientException {
        ByteBuf encryptedPayload = compressedPayload;
        if (!this.conf.isEncryptionEnabled() || this.msgCrypto == null) {
            return encryptedPayload;
        }
        try {
            encryptedPayload = this.msgCrypto.encrypt(this.conf.getEncryptionKeys(), this.conf.getCryptoKeyReader(), msgMetadata, compressedPayload);
        }
        catch (PulsarClientException e) {
            if (this.conf.getCryptoFailureAction() == ProducerCryptoFailureAction.SEND) {
                log.warn("[{}] [{}] Failed to encrypt message {}. Proceeding with publishing unencrypted message", new Object[]{this.topic, this.producerName, e.getMessage()});
                return compressedPayload;
            }
            throw e;
        }
        return encryptedPayload;
    }

    private ByteBuf sendMessage(long producerId, long sequenceId, int numMessages, PulsarApi.MessageMetadata msgMetadata, ByteBuf compressedPayload) throws IOException {
        Commands.ChecksumType checksumType = this.getClientCnx() == null || this.getClientCnx().getRemoteEndpointProtocolVersion() >= this.brokerChecksumSupportedVersion() ? Commands.ChecksumType.Crc32c : Commands.ChecksumType.None;
        return Commands.newSend(producerId, sequenceId, numMessages, checksumType, msgMetadata, compressedPayload);
    }

    private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBuf payload) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Closing out batch to accomodate large message with size {}", new Object[]{this.topic, this.producerName, msg.getDataBuffer().readableBytes()});
        }
        this.batchMessageAndSend();
        this.batchMessageContainer.add(msg, callback);
        payload.release();
    }

    private boolean isValidProducerState(SendCallback callback) {
        switch (this.getState()) {
            case Ready: 
            case Connecting: {
                return true;
            }
            case Closing: 
            case Closed: {
                callback.sendComplete(new PulsarClientException.AlreadyClosedException("Producer already closed"));
                return false;
            }
            case Terminated: {
                callback.sendComplete(new PulsarClientException.TopicTerminatedException("Topic was terminated"));
                return false;
            }
        }
        callback.sendComplete(new PulsarClientException.NotConnectedException());
        return false;
    }

    private boolean canEnqueueRequest(SendCallback callback) {
        try {
            if (this.conf.getBlockIfQueueFull()) {
                this.semaphore.acquire();
            } else if (!this.semaphore.tryAcquire()) {
                callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full"));
                return false;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            callback.sendComplete(new PulsarClientException(e));
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> closeAsync() {
        HandlerBase.State currentState = this.getAndUpdateState(state -> {
            if (state == HandlerBase.State.Closed) {
                return state;
            }
            return HandlerBase.State.Closing;
        });
        if (currentState == HandlerBase.State.Closed || currentState == HandlerBase.State.Closing) {
            return CompletableFuture.completedFuture(null);
        }
        Timeout timeout = this.sendTimeout;
        if (timeout != null) {
            timeout.cancel();
            this.sendTimeout = null;
        }
        if (this.keyGenExecutor != null && !this.keyGenExecutor.isTerminated()) {
            this.keyGenExecutor.shutdown();
        }
        this.stats.cancelStatsTimeout();
        ClientCnx cnx = this.cnx();
        if (cnx == null || currentState != HandlerBase.State.Ready) {
            log.info("[{}] [{}] Closed Producer (not connected)", (Object)this.topic, (Object)this.producerName);
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                this.setState(HandlerBase.State.Closed);
                this.client.cleanupProducer(this);
                PulsarClientException.AlreadyClosedException ex = new PulsarClientException.AlreadyClosedException("Producer was already closed");
                this.pendingMessages.forEach(msg -> {
                    msg.callback.sendComplete(ex);
                    msg.cmd.release();
                    msg.recycle();
                });
                this.pendingMessages.clear();
            }
            return CompletableFuture.completedFuture(null);
        }
        long requestId = this.client.newRequestId();
        ByteBuf cmd = Commands.newCloseProducer(this.producerId, requestId);
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
            cnx.removeProducer(this.producerId);
            if (exception == null || !cnx.ctx().channel().isActive()) {
                ProducerImpl producerImpl = this;
                synchronized (producerImpl) {
                    log.info("[{}] [{}] Closed Producer", (Object)this.topic, (Object)this.producerName);
                    this.setState(HandlerBase.State.Closed);
                    this.pendingMessages.forEach(msg -> {
                        msg.cmd.release();
                        msg.recycle();
                    });
                    this.pendingMessages.clear();
                }
                closeFuture.complete(null);
                this.client.cleanupProducer(this);
            } else {
                closeFuture.completeExceptionally((Throwable)exception);
            }
            return null;
        });
        return closeFuture;
    }

    @Override
    public boolean isConnected() {
        return this.getClientCnx() != null && this.getState() == HandlerBase.State.Ready;
    }

    public boolean isWritable() {
        ClientCnx cnx = this.getClientCnx();
        return cnx != null && cnx.channel().isWritable();
    }

    public void terminated(ClientCnx cnx) {
        HandlerBase.State previousState = this.getAndUpdateState(state -> state == HandlerBase.State.Closed ? HandlerBase.State.Closed : HandlerBase.State.Terminated);
        if (previousState != HandlerBase.State.Terminated && previousState != HandlerBase.State.Closed) {
            log.info("[{}] [{}] The topic has been terminated", (Object)this.topic, (Object)this.producerName);
            this.setClientCnx(null);
            this.failPendingMessages(cnx, new PulsarClientException.TopicTerminatedException("The topic has been terminated"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) {
        OpSendMsg op = null;
        boolean callback = false;
        ProducerImpl producerImpl = this;
        synchronized (producerImpl) {
            op = (OpSendMsg)this.pendingMessages.peek();
            if (op == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Got ack for timed out msg {}", new Object[]{this.topic, this.producerName, sequenceId});
                }
                return;
            }
            long expectedSequenceId = op.sequenceId;
            if (sequenceId > expectedSequenceId) {
                log.warn("[{}] [{}] Got ack for msg. expecting: {} - got: {} - queue-size: {}", new Object[]{this.topic, this.producerName, expectedSequenceId, sequenceId, this.pendingMessages.size()});
                cnx.channel().close();
            } else if (sequenceId < expectedSequenceId) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Got ack for timed out msg {} last-seq: {}", new Object[]{this.topic, this.producerName, sequenceId, expectedSequenceId});
                }
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Received ack for msg {} ", new Object[]{this.topic, this.producerName, sequenceId});
                }
                this.pendingMessages.remove();
                this.semaphore.release(op.numMessagesInBatch);
                callback = true;
                this.pendingCallbacks.add(op);
            }
        }
        if (callback && (op = (OpSendMsg)this.pendingCallbacks.poll()) != null) {
            this.lastSequenceIdPublished = op.sequenceId + (long)op.numMessagesInBatch - 1L;
            op.setMessageId(ledgerId, entryId, this.partitionIndex);
            try {
                op.callback.sendComplete(null);
            }
            catch (Throwable t) {
                log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", new Object[]{this.topic, this.producerName, sequenceId, t});
            }
            ReferenceCountUtil.safeRelease(op.cmd);
            op.recycle();
        }
    }

    protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) {
        OpSendMsg op = (OpSendMsg)this.pendingMessages.peek();
        if (op == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Got send failure for timed out msg {}", new Object[]{this.topic, this.producerName, sequenceId});
            }
        } else {
            long expectedSequenceId = op.sequenceId;
            if (sequenceId == expectedSequenceId) {
                boolean corrupted;
                boolean bl = corrupted = !this.verifyLocalBufferIsNotCorrupted(op);
                if (corrupted) {
                    this.pendingMessages.remove();
                    this.semaphore.release(op.numMessagesInBatch);
                    try {
                        op.callback.sendComplete(new PulsarClientException.ChecksumException("Checksum failded on corrupt message"));
                    }
                    catch (Throwable t) {
                        log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", new Object[]{this.topic, this.producerName, sequenceId, t});
                    }
                    ReferenceCountUtil.safeRelease(op.cmd);
                    op.recycle();
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Message is not corrupted, retry send-message with sequenceId {}", new Object[]{this.topic, this.producerName, sequenceId});
                }
            } else if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Corrupt message is already timed out {}", new Object[]{this.topic, this.producerName, sequenceId});
            }
        }
        this.resendMessages(cnx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) {
        DoubleByteBuf msg = this.getDoubleByteBuf(op.cmd);
        if (msg != null) {
            ByteBuf headerFrame = msg.getFirst();
            msg.markReaderIndex();
            headerFrame.markReaderIndex();
            try {
                headerFrame.skipBytes(4);
                int cmdSize = (int)headerFrame.readUnsignedInt();
                headerFrame.skipBytes(cmdSize);
                if (Commands.hasChecksum(headerFrame)) {
                    int metadataChecksum;
                    long computedChecksum;
                    int checksum = Commands.readChecksum(headerFrame).intValue();
                    boolean bl = (long)checksum == (computedChecksum = (long)Crc32cChecksum.resumeChecksum(metadataChecksum = Crc32cChecksum.computeChecksum(headerFrame), msg.getSecond()));
                    return bl;
                }
                log.warn("[{}] [{}] checksum is not present into message with id {}", new Object[]{this.topic, this.producerName, op.sequenceId});
            }
            finally {
                headerFrame.resetReaderIndex();
                msg.resetReaderIndex();
            }
            return true;
        }
        log.warn("[{}] Failed while casting {} into DoubleByteBuf", (Object)this.producerName, (Object)op.cmd.getClass().getName());
        return false;
    }

    @Override
    void connectionOpened(ClientCnx cnx) {
        this.setClientCnx(cnx);
        cnx.registerProducer(this.producerId, this);
        log.info("[{}] [{}] Creating producer on cnx {}", new Object[]{this.topic, this.producerName, cnx.ctx().channel()});
        long requestId = this.client.newRequestId();
        ((CompletableFuture)cnx.sendRequestWithId(Commands.newProducer(this.topic, this.producerId, requestId, this.producerName), requestId).thenAccept(pair -> {
            String producerName = (String)pair.getLeft();
            long lastSequenceId = (Long)pair.getRight();
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                if (this.getState() == HandlerBase.State.Closing || this.getState() == HandlerBase.State.Closed) {
                    cnx.removeProducer(this.producerId);
                    cnx.channel().close();
                    return;
                }
                this.resetBackoff();
                log.info("[{}] [{}] Created producer on cnx {}", new Object[]{this.topic, producerName, cnx.ctx().channel()});
                this.connectionId = cnx.ctx().channel().toString();
                this.connectedSince = DateFormatter.now();
                if (this.producerName == null) {
                    this.producerName = producerName;
                }
                if (this.lastSequenceIdPublished == -1L && !this.conf.getInitialSequenceId().isPresent()) {
                    this.lastSequenceIdPublished = lastSequenceId;
                    this.msgIdGenerator = lastSequenceId + 1L;
                }
                if (!this.producerCreatedFuture.isDone() && this.isBatchMessagingEnabled()) {
                    this.client.timer().newTimeout(this.batchMessageAndSendTask, this.conf.getBatchingMaxPublishDelayMs(), TimeUnit.MILLISECONDS);
                }
                this.resendMessages(cnx);
            }
        })).exceptionally(e -> {
            Throwable cause = e.getCause();
            cnx.removeProducer(this.producerId);
            if (this.getState() == HandlerBase.State.Closing || this.getState() == HandlerBase.State.Closed) {
                cnx.channel().close();
                return null;
            }
            log.error("[{}] [{}] Failed to create producer: {}", new Object[]{this.topic, this.producerName, cause.getMessage()});
            if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
                ProducerImpl producerImpl = this;
                synchronized (producerImpl) {
                    log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", (Object)this.topic, (Object)this.producerName);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Pending messages: {}", new Object[]{this.topic, this.producerName, this.pendingMessages.size()});
                    }
                    PulsarClientException.ProducerBlockedQuotaExceededException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException("Could not send pending messages as backlog exceeded");
                    this.failPendingMessages(this.cnx(), bqe);
                }
            } else if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededError) {
                log.warn("[{}] [{}] Producer is blocked on creation because backlog exceeded on topic.", (Object)this.producerName, (Object)this.topic);
            }
            if (cause instanceof PulsarClientException.TopicTerminatedException) {
                this.setState(HandlerBase.State.Terminated);
                this.failPendingMessages(this.cnx(), (PulsarClientException)cause);
                this.producerCreatedFuture.completeExceptionally(cause);
                this.client.cleanupProducer(this);
            } else if (this.producerCreatedFuture.isDone() || cause instanceof PulsarClientException && this.isRetriableError((PulsarClientException)cause) && System.currentTimeMillis() < this.createProducerTimeout) {
                this.reconnectLater(cause);
            } else {
                this.setState(HandlerBase.State.Failed);
                this.producerCreatedFuture.completeExceptionally(cause);
                this.client.cleanupProducer(this);
            }
            return null;
        });
    }

    @Override
    void connectionFailed(PulsarClientException exception) {
        if (System.currentTimeMillis() > this.createProducerTimeout && this.producerCreatedFuture.completeExceptionally(exception)) {
            log.info("[{}] Producer creation failed for producer {}", (Object)this.topic, (Object)this.producerId);
            this.setState(HandlerBase.State.Failed);
            this.client.cleanupProducer(this);
        }
    }

    private void resendMessages(ClientCnx cnx) {
        cnx.ctx().channel().eventLoop().execute(() -> {
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                if (this.getState() == HandlerBase.State.Closing || this.getState() == HandlerBase.State.Closed) {
                    cnx.channel().close();
                    return;
                }
                int messagesToResend = this.pendingMessages.size();
                if (messagesToResend == 0) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] No pending messages to resend {}", new Object[]{this.topic, this.producerName, messagesToResend});
                    }
                    if (this.changeToReadyState()) {
                        this.producerCreatedFuture.complete(this);
                        return;
                    }
                    cnx.channel().close();
                    return;
                }
                log.info("[{}] [{}] Re-Sending {} messages to server", new Object[]{this.topic, this.producerName, messagesToResend});
                boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < this.brokerChecksumSupportedVersion();
                for (OpSendMsg op : this.pendingMessages) {
                    if (stripChecksum) {
                        this.stripChecksum(op);
                    }
                    op.cmd.retain();
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", new Object[]{this.topic, this.producerName, cnx.channel(), op.sequenceId});
                    }
                    cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
                    this.stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
                }
                cnx.ctx().flush();
                if (!this.changeToReadyState()) {
                    cnx.channel().close();
                    return;
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stripChecksum(OpSendMsg op) {
        op.cmd.markReaderIndex();
        int totalMsgBufSize = op.cmd.readableBytes();
        DoubleByteBuf msg = this.getDoubleByteBuf(op.cmd);
        if (msg != null) {
            ByteBuf headerFrame = msg.getFirst();
            msg.markReaderIndex();
            headerFrame.markReaderIndex();
            try {
                headerFrame.skipBytes(4);
                int cmdSize = (int)headerFrame.readUnsignedInt();
                headerFrame.skipBytes(cmdSize);
                if (!Commands.hasChecksum(headerFrame)) {
                    headerFrame.resetReaderIndex();
                    return;
                }
                int headerSize = 8 + cmdSize;
                int checksumSize = 6;
                int checksumMark = headerSize + checksumSize;
                int metaPayloadSize = totalMsgBufSize - checksumMark;
                int newTotalFrameSizeLength = 4 + cmdSize + metaPayloadSize;
                headerFrame.resetReaderIndex();
                int headerFrameSize = headerFrame.readableBytes();
                headerFrame.setInt(0, newTotalFrameSizeLength);
                ByteBuf metadata = headerFrame.slice(checksumMark, headerFrameSize - checksumMark);
                headerFrame.writerIndex(headerSize);
                metadata.readBytes(headerFrame, metadata.readableBytes());
                headerFrame.capacity(headerFrameSize - checksumSize);
                headerFrame.resetReaderIndex();
            }
            finally {
                op.cmd.resetReaderIndex();
            }
        } else {
            log.warn("[{}] Failed while casting {} into DoubleByteBuf", (Object)this.producerName, (Object)op.cmd.getClass().getName());
        }
    }

    public int brokerChecksumSupportedVersion() {
        return PulsarApi.ProtocolVersion.v6.getNumber();
    }

    @Override
    String getHandlerName() {
        return this.producerName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(Timeout timeout) throws Exception {
        long timeToWaitMs;
        if (timeout.isCancelled()) {
            return;
        }
        ProducerImpl producerImpl = this;
        synchronized (producerImpl) {
            OpSendMsg firstMsg = (OpSendMsg)this.pendingMessages.peek();
            if (firstMsg == null) {
                timeToWaitMs = this.conf.getSendTimeoutMs();
            } else {
                long diff = firstMsg.createdAt + this.conf.getSendTimeoutMs() - System.currentTimeMillis();
                if (diff <= 0L) {
                    log.info("[{}] [{}] Message send timed out. Failing {} messages", new Object[]{this.topic, this.producerName, this.pendingMessages.size()});
                    PulsarClientException.TimeoutException te = new PulsarClientException.TimeoutException("Could not send message to broker within given timeout");
                    this.failPendingMessages(this.cnx(), te);
                    this.stats.incrementSendFailed(this.pendingMessages.size());
                    timeToWaitMs = this.conf.getSendTimeoutMs();
                } else {
                    timeToWaitMs = diff;
                }
            }
        }
        this.sendTimeout = this.client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
    }

    private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
        if (cnx == null) {
            AtomicInteger releaseCount = new AtomicInteger();
            this.pendingMessages.forEach(op -> {
                releaseCount.addAndGet(op.numMessagesInBatch);
                try {
                    op.callback.sendComplete(ex);
                }
                catch (Throwable t) {
                    log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", new Object[]{this.topic, this.producerName, op.sequenceId, t});
                }
                ReferenceCountUtil.safeRelease(op.cmd);
                op.recycle();
            });
            this.semaphore.release(releaseCount.get());
            this.pendingMessages.clear();
            this.pendingCallbacks.clear();
            if (this.isBatchMessagingEnabled()) {
                this.failPendingBatchMessages(ex);
            }
        } else {
            cnx.ctx().channel().eventLoop().execute(() -> {
                ProducerImpl producerImpl = this;
                synchronized (producerImpl) {
                    this.failPendingMessages(null, ex);
                }
            });
        }
    }

    private void failPendingBatchMessages(PulsarClientException ex) {
        if (this.batchMessageContainer.isEmpty()) {
            return;
        }
        int numMessagesInBatch = this.batchMessageContainer.numMessagesInBatch;
        this.semaphore.release(numMessagesInBatch);
        try {
            this.batchMessageContainer.firstCallback.sendComplete(ex);
        }
        catch (Throwable t) {
            log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", new Object[]{this.topic, this.producerName, this.batchMessageContainer.sequenceId, t});
        }
        ReferenceCountUtil.safeRelease(this.batchMessageContainer.getBatchedSingleMessageMetadataAndPayload());
        this.batchMessageContainer.clear();
    }

    private void batchMessageAndSend() {
        block11: {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Batching the messages from the batch container with {} messages", new Object[]{this.topic, this.producerName, this.batchMessageContainer.numMessagesInBatch});
            }
            OpSendMsg op = null;
            int numMessagesInBatch = 0;
            try {
                if (!this.batchMessageContainer.isEmpty()) {
                    numMessagesInBatch = this.batchMessageContainer.numMessagesInBatch;
                    ByteBuf compressedPayload = this.batchMessageContainer.getCompressedBatchMetadataAndPayload();
                    long sequenceId = this.batchMessageContainer.sequenceId;
                    ByteBuf encryptedPayload = this.encryptMessage(this.batchMessageContainer.messageMetadata, compressedPayload);
                    ByteBuf cmd = this.sendMessage(this.producerId, sequenceId, this.batchMessageContainer.numMessagesInBatch, this.batchMessageContainer.setBatchAndBuild(), encryptedPayload);
                    op = OpSendMsg.create(this.batchMessageContainer.messages, cmd, sequenceId, this.batchMessageContainer.firstCallback);
                    op.setNumMessagesInBatch(this.batchMessageContainer.numMessagesInBatch);
                    op.setBatchSizeByte(this.batchMessageContainer.currentBatchSizeBytes);
                    this.batchMessageContainer.clear();
                    this.pendingMessages.put(op);
                    if (this.isConnected()) {
                        cmd.retain();
                        this.cnx().ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, this.cnx(), op));
                        this.stats.updateNumMsgsSent(numMessagesInBatch, op.batchSizeByte);
                    } else if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", new Object[]{this.topic, this.producerName, sequenceId});
                    }
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                this.semaphore.release(numMessagesInBatch);
                if (op != null) {
                    op.callback.sendComplete(new PulsarClientException(ie));
                }
            }
            catch (PulsarClientException e) {
                Thread.currentThread().interrupt();
                this.semaphore.release(numMessagesInBatch);
                if (op != null) {
                    op.callback.sendComplete(e);
                }
            }
            catch (Throwable t) {
                this.semaphore.release(numMessagesInBatch);
                log.warn("[{}] [{}] error while closing out batch -- {}", new Object[]{this.topic, this.producerName, t});
                if (op == null) break block11;
                op.callback.sendComplete(new PulsarClientException(t));
            }
        }
    }

    private DoubleByteBuf getDoubleByteBuf(ByteBuf cmd) {
        DoubleByteBuf msg = null;
        if (cmd instanceof DoubleByteBuf) {
            msg = (DoubleByteBuf)cmd;
        } else {
            try {
                msg = (DoubleByteBuf)cmd.unwrap();
            }
            catch (Exception e) {
                log.error("[{}] Failed while casting {} into DoubleByteBuf", new Object[]{this.producerName, cmd.getClass().getName(), e});
            }
        }
        return msg;
    }

    public long getDelayInMillis() {
        OpSendMsg firstMsg = (OpSendMsg)this.pendingMessages.peek();
        if (firstMsg != null) {
            return System.currentTimeMillis() - firstMsg.createdAt;
        }
        return 0L;
    }

    public String getConnectionId() {
        return this.cnx() != null ? this.connectionId : null;
    }

    public String getConnectedSince() {
        return this.cnx() != null ? this.connectedSince : null;
    }

    public int getPendingQueueSize() {
        return this.pendingMessages.size();
    }

    private PulsarApi.CompressionType convertCompressionType(CompressionType compressionType) {
        switch (compressionType) {
            case NONE: {
                return PulsarApi.CompressionType.NONE;
            }
            case LZ4: {
                return PulsarApi.CompressionType.LZ4;
            }
            case ZLIB: {
                return PulsarApi.CompressionType.ZLIB;
            }
        }
        throw new RuntimeException("Invalid compression type");
    }

    @Override
    public ProducerStats getStats() {
        if (this.stats instanceof ProducerStatsDisabled) {
            return null;
        }
        return this.stats;
    }

    @Override
    public String getProducerName() {
        return this.producerName;
    }

    protected static final class OpSendMsg {
        MessageImpl msg;
        List<MessageImpl> msgs;
        ByteBuf cmd;
        SendCallback callback;
        long sequenceId;
        long createdAt;
        long batchSizeByte = 0L;
        int numMessagesInBatch = 1;
        private final Recycler.Handle<OpSendMsg> recyclerHandle;
        private static final Recycler<OpSendMsg> RECYCLER = new Recycler<OpSendMsg>(){

            @Override
            protected OpSendMsg newObject(Recycler.Handle<OpSendMsg> handle) {
                return new OpSendMsg(handle);
            }
        };

        static OpSendMsg create(MessageImpl msg, ByteBuf cmd, long sequenceId, SendCallback callback) {
            OpSendMsg op = RECYCLER.get();
            op.msg = msg;
            op.cmd = cmd;
            op.callback = callback;
            op.sequenceId = sequenceId;
            op.createdAt = System.currentTimeMillis();
            return op;
        }

        static OpSendMsg create(List<MessageImpl> msgs, ByteBuf cmd, long sequenceId, SendCallback callback) {
            OpSendMsg op = RECYCLER.get();
            op.msgs = msgs;
            op.cmd = cmd;
            op.callback = callback;
            op.sequenceId = sequenceId;
            op.createdAt = System.currentTimeMillis();
            return op;
        }

        void recycle() {
            this.msg = null;
            this.msgs = null;
            this.cmd = null;
            this.callback = null;
            this.sequenceId = -1L;
            this.createdAt = -1L;
            this.recyclerHandle.recycle(this);
        }

        void setNumMessagesInBatch(int numMessagesInBatch) {
            this.numMessagesInBatch = numMessagesInBatch;
        }

        void setBatchSizeByte(long batchSizeByte) {
            this.batchSizeByte = batchSizeByte;
        }

        void setMessageId(long ledgerId, long entryId, int partitionIndex) {
            if (this.msg != null) {
                this.msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex));
            } else {
                for (int batchIndex = 0; batchIndex < this.msgs.size(); ++batchIndex) {
                    this.msgs.get(batchIndex).setMessageId(new BatchMessageIdImpl(ledgerId, entryId, partitionIndex, batchIndex));
                }
            }
        }

        private OpSendMsg(Recycler.Handle<OpSendMsg> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }
    }

    private static final class WriteInEventLoopCallback
    implements Runnable {
        private ProducerImpl producer;
        private ClientCnx cnx;
        private OpSendMsg op;
        private final Recycler.Handle<WriteInEventLoopCallback> recyclerHandle;
        private static final Recycler<WriteInEventLoopCallback> RECYCLER = new Recycler<WriteInEventLoopCallback>(){

            @Override
            protected WriteInEventLoopCallback newObject(Recycler.Handle<WriteInEventLoopCallback> handle) {
                return new WriteInEventLoopCallback(handle);
            }
        };

        static WriteInEventLoopCallback create(ProducerImpl producer, ClientCnx cnx, OpSendMsg op) {
            WriteInEventLoopCallback c = RECYCLER.get();
            c.producer = producer;
            c.cnx = cnx;
            c.op = op;
            return c;
        }

        @Override
        public void run() {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", new Object[]{this.producer.topic, this.producer.producerName, this.cnx, this.op.sequenceId});
            }
            try {
                this.cnx.ctx().writeAndFlush(this.op.cmd, this.cnx.ctx().voidPromise());
            }
            finally {
                this.recycle();
            }
        }

        private void recycle() {
            this.producer = null;
            this.cnx = null;
            this.op = null;
            this.recyclerHandle.recycle(this);
        }

        private WriteInEventLoopCallback(Recycler.Handle<WriteInEventLoopCallback> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }
    }
}

