/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.BatchMessageContainerBase;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionHandler;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.ProducerStatsDisabled;
import org.apache.pulsar.client.impl.ProducerStatsRecorder;
import org.apache.pulsar.client.impl.ProducerStatsRecorderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Queues;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.Recycler;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.apache.pulsar.shade.io.netty.util.concurrent.ScheduledFuture;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerImpl<T>
extends ProducerBase<T>
implements TimerTask,
ConnectionHandler.Connection {
    protected final long producerId;
    private volatile long msgIdGenerator;
    private final BlockingQueue<OpSendMsg> pendingMessages;
    private final BlockingQueue<OpSendMsg> pendingCallbacks;
    private final Semaphore semaphore;
    private volatile Timeout sendTimeout = null;
    private volatile Timeout batchMessageAndSendTimeout = null;
    private long createProducerTimeout;
    private final int maxNumMessagesInBatch;
    private final BatchMessageContainerBase batchMessageContainer;
    private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
    private String producerName;
    private String connectionId;
    private String connectedSince;
    private final int partitionIndex;
    private final ProducerStatsRecorder stats;
    private final CompressionCodec compressor;
    private volatile long lastSequenceIdPublished;
    private MessageCrypto msgCrypto = null;
    private ScheduledFuture<?> keyGeneratorTask = null;
    private final Map<String, String> metadata;
    private Optional<byte[]> schemaVersion = Optional.empty();
    private final ConnectionHandler connectionHandler;
    private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater.newUpdater(ProducerImpl.class, "msgIdGenerator");
    TimerTask batchMessageAndSendTask = new TimerTask(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run(Timeout timeout) throws Exception {
            if (timeout.isCancelled()) {
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Batching the messages from the batch container from timer thread", (Object)ProducerImpl.this.topic, (Object)ProducerImpl.this.producerName);
            }
            ProducerImpl producerImpl = ProducerImpl.this;
            synchronized (producerImpl) {
                if (ProducerImpl.this.getState() == HandlerState.State.Closing || ProducerImpl.this.getState() == HandlerState.State.Closed) {
                    return;
                }
                ProducerImpl.this.batchMessageAndSend();
                ProducerImpl.this.batchMessageAndSendTimeout = ProducerImpl.this.client.timer().newTimeout(this, ProducerImpl.this.conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
            }
        }
    };
    private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);

    public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema, ProducerInterceptors<T> interceptors) {
        super(client, topic, conf, producerCreatedFuture, schema, interceptors);
        this.producerId = client.newProducerId();
        this.producerName = conf.getProducerName();
        this.partitionIndex = partitionIndex;
        this.pendingMessages = Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
        this.pendingCallbacks = Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
        this.semaphore = new Semaphore(conf.getMaxPendingMessages(), true);
        this.compressor = CompressionCodecProvider.getCompressionCodec(conf.getCompressionType());
        if (conf.getInitialSequenceId() != null) {
            long initialSequenceId;
            this.lastSequenceIdPublished = initialSequenceId = conf.getInitialSequenceId().longValue();
            this.msgIdGenerator = initialSequenceId + 1L;
        } else {
            this.lastSequenceIdPublished = -1L;
            this.msgIdGenerator = 0L;
        }
        if (conf.isEncryptionEnabled()) {
            String logCtx = "[" + topic + "] [" + this.producerName + "] [" + this.producerId + "]";
            this.msgCrypto = new MessageCrypto(logCtx, true);
            this.keyGeneratorTask = client.eventLoopGroup().scheduleWithFixedDelay(() -> {
                block2: {
                    try {
                        this.msgCrypto.addPublicKeyCipher(conf.getEncryptionKeys(), conf.getCryptoKeyReader());
                    }
                    catch (PulsarClientException.CryptoException e) {
                        if (producerCreatedFuture.isDone()) break block2;
                        log.warn("[{}] [{}] [{}] Failed to add public key cipher.", new Object[]{topic, this.producerName, this.producerId});
                        producerCreatedFuture.completeExceptionally(e);
                    }
                }
            }, 0L, 4L, TimeUnit.HOURS);
        }
        if (conf.getSendTimeoutMs() > 0L) {
            this.sendTimeout = client.timer().newTimeout(this, conf.getSendTimeoutMs(), TimeUnit.MILLISECONDS);
        }
        this.createProducerTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
        if (conf.isBatchingEnabled()) {
            this.maxNumMessagesInBatch = conf.getBatchingMaxMessages();
            BatcherBuilder containerBuilder = conf.getBatcherBuilder();
            if (containerBuilder == null) {
                containerBuilder = BatcherBuilder.DEFAULT;
            }
            this.batchMessageContainer = (BatchMessageContainerBase)containerBuilder.build();
            this.batchMessageContainer.setProducer(this);
        } else {
            this.maxNumMessagesInBatch = 1;
            this.batchMessageContainer = null;
        }
        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0L ? new ProducerStatsRecorderImpl(client, conf, this) : ProducerStatsDisabled.INSTANCE;
        this.metadata = conf.getProperties().isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<String, String>(conf.getProperties()));
        this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder().setInitialTime(100L, TimeUnit.MILLISECONDS).setMax(60L, TimeUnit.SECONDS).setMandatoryStop(Math.max(100L, conf.getSendTimeoutMs() - 100L), TimeUnit.MILLISECONDS).useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(), client.getConfiguration().getMaxBackoffIntervalNanos()).create(), this);
        this.grabCnx();
    }

    public ConnectionHandler getConnectionHandler() {
        return this.connectionHandler;
    }

    private boolean isBatchMessagingEnabled() {
        return this.conf.isBatchingEnabled();
    }

    public long getLastSequenceId() {
        return this.lastSequenceIdPublished;
    }

    @Override
    CompletableFuture<MessageId> internalSendAsync(Message<T> message) {
        final CompletableFuture<MessageId> future = new CompletableFuture<MessageId>();
        final MessageImpl interceptorMessage = (MessageImpl)this.beforeSend(message);
        interceptorMessage.getDataBuffer().retain();
        if (this.interceptors != null) {
            interceptorMessage.getProperties();
        }
        this.sendAsync(interceptorMessage, new SendCallback(){
            SendCallback nextCallback = null;
            MessageImpl<?> nextMsg = null;
            long createdAt = System.nanoTime();

            @Override
            public CompletableFuture<MessageId> getFuture() {
                return future;
            }

            @Override
            public SendCallback getNextSendCallback() {
                return this.nextCallback;
            }

            @Override
            public MessageImpl<?> getNextMessage() {
                return this.nextMsg;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void sendComplete(Exception e) {
                try {
                    if (e != null) {
                        ProducerImpl.this.stats.incrementSendFailed();
                        ProducerImpl.this.onSendAcknowledgement(interceptorMessage, null, e);
                        future.completeExceptionally(e);
                    } else {
                        ProducerImpl.this.onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null);
                        future.complete(interceptorMessage.getMessageId());
                        ProducerImpl.this.stats.incrementNumAcksReceived(System.nanoTime() - this.createdAt);
                    }
                }
                finally {
                    interceptorMessage.getDataBuffer().release();
                }
                while (this.nextCallback != null) {
                    SendCallback sendCallback = this.nextCallback;
                    MessageImpl<?> msg = this.nextMsg;
                    try {
                        msg.getDataBuffer().retain();
                        if (e != null) {
                            ProducerImpl.this.stats.incrementSendFailed();
                            ProducerImpl.this.onSendAcknowledgement(msg, null, e);
                            sendCallback.getFuture().completeExceptionally(e);
                        } else {
                            ProducerImpl.this.onSendAcknowledgement(msg, msg.getMessageId(), null);
                            sendCallback.getFuture().complete(msg.getMessageId());
                            ProducerImpl.this.stats.incrementNumAcksReceived(System.nanoTime() - this.createdAt);
                        }
                        this.nextMsg = this.nextCallback.getNextMessage();
                        this.nextCallback = this.nextCallback.getNextSendCallback();
                    }
                    finally {
                        msg.getDataBuffer().release();
                    }
                }
            }

            @Override
            public void addCallback(MessageImpl<?> msg, SendCallback scb) {
                this.nextMsg = msg;
                this.nextCallback = scb;
            }
        });
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendAsync(Message<T> message, SendCallback callback) {
        Preconditions.checkArgument(message instanceof MessageImpl);
        if (!this.isValidProducerState(callback)) {
            return;
        }
        if (!this.canEnqueueRequest(callback)) {
            return;
        }
        MessageImpl msg = (MessageImpl)message;
        PulsarApi.MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder();
        ByteBuf payload = msg.getDataBuffer();
        int uncompressedSize = payload.readableBytes();
        ByteBuf compressedPayload = payload;
        if (!this.isBatchMessagingEnabled() || msgMetadataBuilder.hasDeliverAtTime()) {
            compressedPayload = this.compressor.encode(payload);
            payload.release();
            int compressedSize = compressedPayload.readableBytes();
            if (compressedSize > ClientCnx.getMaxMessageSize()) {
                compressedPayload.release();
                String compressedStr = !this.isBatchMessagingEnabled() && this.conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
                PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException(String.format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize, ClientCnx.getMaxMessageSize()));
                callback.sendComplete((Exception)invalidMessageException);
                return;
            }
        }
        if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) {
            PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException("Cannot re-use the same message");
            callback.sendComplete((Exception)invalidMessageException);
            compressedPayload.release();
            return;
        }
        if (this.schemaVersion.isPresent()) {
            msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom((byte[])this.schemaVersion.get()));
        }
        try {
            ProducerImpl invalidMessageException = this;
            synchronized (invalidMessageException) {
                long sequenceId;
                if (!msgMetadataBuilder.hasSequenceId()) {
                    sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
                    msgMetadataBuilder.setSequenceId(sequenceId);
                } else {
                    sequenceId = msgMetadataBuilder.getSequenceId();
                }
                if (!msgMetadataBuilder.hasPublishTime()) {
                    msgMetadataBuilder.setPublishTime(this.client.getClientClock().millis());
                    Preconditions.checkArgument(!msgMetadataBuilder.hasProducerName());
                    msgMetadataBuilder.setProducerName(this.producerName);
                    if (this.conf.getCompressionType() != CompressionType.NONE) {
                        msgMetadataBuilder.setCompression(CompressionCodecProvider.convertToWireProtocol(this.conf.getCompressionType()));
                    }
                    msgMetadataBuilder.setUncompressedSize(uncompressedSize);
                }
                if (this.isBatchMessagingEnabled() && !msgMetadataBuilder.hasDeliverAtTime()) {
                    if (this.batchMessageContainer.haveEnoughSpace(msg)) {
                        this.batchMessageContainer.add(msg, callback);
                        this.lastSendFuture = callback.getFuture();
                        payload.release();
                        if (this.batchMessageContainer.getNumMessagesInBatch() == this.maxNumMessagesInBatch || this.batchMessageContainer.getCurrentBatchSize() >= 131072L) {
                            this.batchMessageAndSend();
                        }
                    } else {
                        this.doBatchSendAndAdd(msg, callback, payload);
                    }
                } else {
                    ByteBuf encryptedPayload = this.encryptMessage(msgMetadataBuilder, compressedPayload);
                    PulsarApi.MessageMetadata msgMetadata = msgMetadataBuilder.build();
                    int numMessages = msg.getMessageBuilder().hasNumMessagesInBatch() ? msg.getMessageBuilder().getNumMessagesInBatch() : 1;
                    ByteBufPair cmd = this.sendMessage(this.producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
                    msgMetadataBuilder.recycle();
                    msgMetadata.recycle();
                    OpSendMsg op = OpSendMsg.create(msg, cmd, sequenceId, callback);
                    op.setNumMessagesInBatch(numMessages);
                    op.setBatchSizeByte(encryptedPayload.readableBytes());
                    this.pendingMessages.put(op);
                    this.lastSendFuture = callback.getFuture();
                    ClientCnx cnx = this.cnx();
                    if (this.isConnected()) {
                        cmd.retain();
                        cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
                        this.stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
                    } else if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", new Object[]{this.topic, this.producerName, sequenceId});
                    }
                }
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            this.semaphore.release();
            callback.sendComplete((Exception)((Object)new PulsarClientException((Throwable)ie)));
        }
        catch (PulsarClientException e) {
            this.semaphore.release();
            callback.sendComplete((Exception)((Object)e));
        }
        catch (Throwable t) {
            this.semaphore.release();
            callback.sendComplete((Exception)((Object)new PulsarClientException(t)));
        }
    }

    protected ByteBuf encryptMessage(PulsarApi.MessageMetadata.Builder msgMetadata, ByteBuf compressedPayload) throws PulsarClientException {
        ByteBuf encryptedPayload = compressedPayload;
        if (!this.conf.isEncryptionEnabled() || this.msgCrypto == null) {
            return encryptedPayload;
        }
        try {
            encryptedPayload = this.msgCrypto.encrypt(this.conf.getEncryptionKeys(), this.conf.getCryptoKeyReader(), msgMetadata, compressedPayload);
        }
        catch (PulsarClientException e) {
            if (this.conf.getCryptoFailureAction() == ProducerCryptoFailureAction.SEND) {
                log.warn("[{}] [{}] Failed to encrypt message {}. Proceeding with publishing unencrypted message", new Object[]{this.topic, this.producerName, e.getMessage()});
                return compressedPayload;
            }
            throw e;
        }
        return encryptedPayload;
    }

    protected ByteBufPair sendMessage(long producerId, long sequenceId, int numMessages, PulsarApi.MessageMetadata msgMetadata, ByteBuf compressedPayload) throws IOException {
        Commands.ChecksumType checksumType = this.connectionHandler.getClientCnx() == null || this.connectionHandler.getClientCnx().getRemoteEndpointProtocolVersion() >= this.brokerChecksumSupportedVersion() ? Commands.ChecksumType.Crc32c : Commands.ChecksumType.None;
        return Commands.newSend(producerId, sequenceId, numMessages, checksumType, msgMetadata, compressedPayload);
    }

    private void doBatchSendAndAdd(MessageImpl<T> msg, SendCallback callback, ByteBuf payload) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Closing out batch to accommodate large message with size {}", new Object[]{this.topic, this.producerName, msg.getDataBuffer().readableBytes()});
        }
        this.batchMessageAndSend();
        this.batchMessageContainer.add(msg, callback);
        this.lastSendFuture = callback.getFuture();
        payload.release();
    }

    private boolean isValidProducerState(SendCallback callback) {
        switch (this.getState()) {
            case Ready: 
            case Connecting: {
                return true;
            }
            case Closing: 
            case Closed: {
                callback.sendComplete((Exception)new PulsarClientException.AlreadyClosedException("Producer already closed"));
                return false;
            }
            case Terminated: {
                callback.sendComplete((Exception)new PulsarClientException.TopicTerminatedException("Topic was terminated"));
                return false;
            }
        }
        callback.sendComplete((Exception)new PulsarClientException.NotConnectedException());
        return false;
    }

    private boolean canEnqueueRequest(SendCallback callback) {
        try {
            if (this.conf.isBlockIfQueueFull()) {
                this.semaphore.acquire();
            } else if (!this.semaphore.tryAcquire()) {
                callback.sendComplete((Exception)new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full"));
                return false;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            callback.sendComplete((Exception)((Object)new PulsarClientException((Throwable)e)));
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> closeAsync() {
        Timeout batchTimeout;
        HandlerState.State currentState = this.getAndUpdateState(state -> {
            if (state == HandlerState.State.Closed) {
                return state;
            }
            return HandlerState.State.Closing;
        });
        if (currentState == HandlerState.State.Closed || currentState == HandlerState.State.Closing) {
            return CompletableFuture.completedFuture(null);
        }
        Timeout timeout = this.sendTimeout;
        if (timeout != null) {
            timeout.cancel();
            this.sendTimeout = null;
        }
        if ((batchTimeout = this.batchMessageAndSendTimeout) != null) {
            batchTimeout.cancel();
            this.batchMessageAndSendTimeout = null;
        }
        if (this.keyGeneratorTask != null && !this.keyGeneratorTask.isCancelled()) {
            this.keyGeneratorTask.cancel(false);
        }
        this.stats.cancelStatsTimeout();
        ClientCnx cnx = this.cnx();
        if (cnx == null || currentState != HandlerState.State.Ready) {
            log.info("[{}] [{}] Closed Producer (not connected)", (Object)this.topic, (Object)this.producerName);
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                this.setState(HandlerState.State.Closed);
                this.client.cleanupProducer(this);
                PulsarClientException.AlreadyClosedException ex = new PulsarClientException.AlreadyClosedException("Producer was already closed");
                this.pendingMessages.forEach(arg_0 -> ProducerImpl.lambda$closeAsync$2((PulsarClientException)ex, arg_0));
                this.pendingMessages.clear();
            }
            return CompletableFuture.completedFuture(null);
        }
        long requestId = this.client.newRequestId();
        ByteBuf cmd = Commands.newCloseProducer(this.producerId, requestId);
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
            cnx.removeProducer(this.producerId);
            if (exception == null || !cnx.ctx().channel().isActive()) {
                ProducerImpl producerImpl = this;
                synchronized (producerImpl) {
                    log.info("[{}] [{}] Closed Producer", (Object)this.topic, (Object)this.producerName);
                    this.setState(HandlerState.State.Closed);
                    this.pendingMessages.forEach(msg -> {
                        msg.cmd.release();
                        msg.recycle();
                    });
                    this.pendingMessages.clear();
                }
                closeFuture.complete(null);
                this.client.cleanupProducer(this);
            } else {
                closeFuture.completeExceptionally((Throwable)exception);
            }
            return null;
        });
        return closeFuture;
    }

    public boolean isConnected() {
        return this.connectionHandler.getClientCnx() != null && this.getState() == HandlerState.State.Ready;
    }

    public boolean isWritable() {
        ClientCnx cnx = this.connectionHandler.getClientCnx();
        return cnx != null && cnx.channel().isWritable();
    }

    public void terminated(ClientCnx cnx) {
        HandlerState.State previousState = this.getAndUpdateState(state -> state == HandlerState.State.Closed ? HandlerState.State.Closed : HandlerState.State.Terminated);
        if (previousState != HandlerState.State.Terminated && previousState != HandlerState.State.Closed) {
            log.info("[{}] [{}] The topic has been terminated", (Object)this.topic, (Object)this.producerName);
            this.setClientCnx(null);
            this.failPendingMessages(cnx, (PulsarClientException)new PulsarClientException.TopicTerminatedException("The topic has been terminated"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) {
        OpSendMsg op = null;
        boolean callback = false;
        ProducerImpl producerImpl = this;
        synchronized (producerImpl) {
            op = (OpSendMsg)this.pendingMessages.peek();
            if (op == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Got ack for timed out msg {}", new Object[]{this.topic, this.producerName, sequenceId});
                }
                return;
            }
            long expectedSequenceId = op.sequenceId;
            if (sequenceId > expectedSequenceId) {
                log.warn("[{}] [{}] Got ack for msg. expecting: {} - got: {} - queue-size: {}", new Object[]{this.topic, this.producerName, expectedSequenceId, sequenceId, this.pendingMessages.size()});
                cnx.channel().close();
            } else if (sequenceId < expectedSequenceId) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Got ack for timed out msg {} last-seq: {}", new Object[]{this.topic, this.producerName, sequenceId, expectedSequenceId});
                }
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Received ack for msg {} ", new Object[]{this.topic, this.producerName, sequenceId});
                }
                this.pendingMessages.remove();
                this.semaphore.release(op.numMessagesInBatch);
                callback = true;
                this.pendingCallbacks.add(op);
            }
        }
        if (callback && (op = (OpSendMsg)this.pendingCallbacks.poll()) != null) {
            this.lastSequenceIdPublished = op.sequenceId + (long)op.numMessagesInBatch - 1L;
            op.setMessageId(ledgerId, entryId, this.partitionIndex);
            try {
                op.callback.sendComplete(null);
            }
            catch (Throwable t) {
                log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", new Object[]{this.topic, this.producerName, sequenceId, t});
            }
            ReferenceCountUtil.safeRelease(op.cmd);
            op.recycle();
        }
    }

    protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) {
        OpSendMsg op = (OpSendMsg)this.pendingMessages.peek();
        if (op == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Got send failure for timed out msg {}", new Object[]{this.topic, this.producerName, sequenceId});
            }
        } else {
            long expectedSequenceId = op.sequenceId;
            if (sequenceId == expectedSequenceId) {
                boolean corrupted;
                boolean bl = corrupted = !this.verifyLocalBufferIsNotCorrupted(op);
                if (corrupted) {
                    this.pendingMessages.remove();
                    this.semaphore.release(op.numMessagesInBatch);
                    try {
                        op.callback.sendComplete((Exception)new PulsarClientException.ChecksumException("Checksum failed on corrupt message"));
                    }
                    catch (Throwable t) {
                        log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", new Object[]{this.topic, this.producerName, sequenceId, t});
                    }
                    ReferenceCountUtil.safeRelease(op.cmd);
                    op.recycle();
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Message is not corrupted, retry send-message with sequenceId {}", new Object[]{this.topic, this.producerName, sequenceId});
                }
            } else if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Corrupt message is already timed out {}", new Object[]{this.topic, this.producerName, sequenceId});
            }
        }
        this.resendMessages(cnx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) {
        ByteBufPair msg = op.cmd;
        if (msg != null) {
            ByteBuf headerFrame = msg.getFirst();
            headerFrame.markReaderIndex();
            try {
                headerFrame.skipBytes(4);
                int cmdSize = (int)headerFrame.readUnsignedInt();
                headerFrame.skipBytes(cmdSize);
                if (Commands.hasChecksum(headerFrame)) {
                    int metadataChecksum;
                    long computedChecksum;
                    int checksum = Commands.readChecksum(headerFrame);
                    boolean bl = (long)checksum == (computedChecksum = (long)Crc32cIntChecksum.resumeChecksum(metadataChecksum = Crc32cIntChecksum.computeChecksum(headerFrame), msg.getSecond()));
                    return bl;
                }
                log.warn("[{}] [{}] checksum is not present into message with id {}", new Object[]{this.topic, this.producerName, op.sequenceId});
            }
            finally {
                headerFrame.resetReaderIndex();
            }
            return true;
        }
        log.warn("[{}] Failed while casting {} into ByteBufPair", (Object)this.producerName, (Object)op.cmd.getClass().getName());
        return false;
    }

    @Override
    public void connectionOpened(ClientCnx cnx) {
        this.connectionHandler.setClientCnx(cnx);
        cnx.registerProducer(this.producerId, this);
        log.info("[{}] [{}] Creating producer on cnx {}", new Object[]{this.topic, this.producerName, cnx.ctx().channel()});
        long requestId = this.client.newRequestId();
        Object schemaInfo = null;
        if (this.schema != null && this.schema.getSchemaInfo() != null) {
            if (this.schema.getSchemaInfo().getType() == SchemaType.JSON) {
                if (Commands.peerSupportJsonSchemaAvroFormat(cnx.getRemoteEndpointProtocolVersion())) {
                    schemaInfo = this.schema.getSchemaInfo();
                } else if (this.schema instanceof JSONSchema) {
                    JSONSchema jsonSchema = (JSONSchema)this.schema;
                    schemaInfo = jsonSchema.getBackwardsCompatibleJsonSchemaInfo();
                } else {
                    schemaInfo = this.schema.getSchemaInfo();
                }
            } else {
                schemaInfo = this.schema.getSchemaInfo().getType() == SchemaType.BYTES || this.schema.getSchemaInfo().getType() == SchemaType.NONE ? null : this.schema.getSchemaInfo();
            }
        }
        ((CompletableFuture)cnx.sendRequestWithId(Commands.newProducer(this.topic, this.producerId, requestId, this.producerName, this.conf.isEncryptionEnabled(), this.metadata, schemaInfo), requestId).thenAccept(response -> {
            String producerName = response.getProducerName();
            long lastSequenceId = response.getLastSequenceId();
            this.schemaVersion = Optional.ofNullable(response.getSchemaVersion());
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                    cnx.removeProducer(this.producerId);
                    cnx.channel().close();
                    return;
                }
                this.resetBackoff();
                log.info("[{}] [{}] Created producer on cnx {}", new Object[]{this.topic, producerName, cnx.ctx().channel()});
                this.connectionId = cnx.ctx().channel().toString();
                this.connectedSince = DateFormatter.now();
                if (this.producerName == null) {
                    this.producerName = producerName;
                }
                if (this.msgIdGenerator == 0L && this.conf.getInitialSequenceId() == null) {
                    this.lastSequenceIdPublished = lastSequenceId;
                    this.msgIdGenerator = lastSequenceId + 1L;
                }
                if (!this.producerCreatedFuture.isDone() && this.isBatchMessagingEnabled()) {
                    this.client.timer().newTimeout(this.batchMessageAndSendTask, this.conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
                }
                this.resendMessages(cnx);
            }
        })).exceptionally(e -> {
            Throwable cause = e.getCause();
            cnx.removeProducer(this.producerId);
            if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                cnx.channel().close();
                return null;
            }
            log.error("[{}] [{}] Failed to create producer: {}", new Object[]{this.topic, this.producerName, cause.getMessage()});
            if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
                ProducerImpl producerImpl = this;
                synchronized (producerImpl) {
                    log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", (Object)this.topic, (Object)this.producerName);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Pending messages: {}", new Object[]{this.topic, this.producerName, this.pendingMessages.size()});
                    }
                    PulsarClientException.ProducerBlockedQuotaExceededException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException("Could not send pending messages as backlog exceeded");
                    this.failPendingMessages(this.cnx(), (PulsarClientException)bqe);
                }
            } else if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededError) {
                log.warn("[{}] [{}] Producer is blocked on creation because backlog exceeded on topic.", (Object)this.producerName, (Object)this.topic);
            }
            if (cause instanceof PulsarClientException.TopicTerminatedException) {
                this.setState(HandlerState.State.Terminated);
                this.failPendingMessages(this.cnx(), (PulsarClientException)cause);
                this.producerCreatedFuture.completeExceptionally(cause);
                this.client.cleanupProducer(this);
            } else if (this.producerCreatedFuture.isDone() || cause instanceof PulsarClientException && this.connectionHandler.isRetriableError((PulsarClientException)cause) && System.currentTimeMillis() < this.createProducerTimeout) {
                this.reconnectLater(cause);
            } else {
                this.setState(HandlerState.State.Failed);
                this.producerCreatedFuture.completeExceptionally(cause);
                this.client.cleanupProducer(this);
            }
            return null;
        });
    }

    @Override
    public void connectionFailed(PulsarClientException exception) {
        if (System.currentTimeMillis() > this.createProducerTimeout && this.producerCreatedFuture.completeExceptionally(exception)) {
            log.info("[{}] Producer creation failed for producer {}", (Object)this.topic, (Object)this.producerId);
            this.setState(HandlerState.State.Failed);
            this.client.cleanupProducer(this);
        }
    }

    private void resendMessages(ClientCnx cnx) {
        cnx.ctx().channel().eventLoop().execute(() -> {
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                    cnx.channel().close();
                    return;
                }
                int messagesToResend = this.pendingMessages.size();
                if (messagesToResend == 0) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] No pending messages to resend {}", new Object[]{this.topic, this.producerName, messagesToResend});
                    }
                    if (this.changeToReadyState()) {
                        this.producerCreatedFuture.complete(this);
                        return;
                    }
                    cnx.channel().close();
                    return;
                }
                log.info("[{}] [{}] Re-Sending {} messages to server", new Object[]{this.topic, this.producerName, messagesToResend});
                boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < this.brokerChecksumSupportedVersion();
                for (OpSendMsg op : this.pendingMessages) {
                    if (stripChecksum) {
                        this.stripChecksum(op);
                    }
                    op.cmd.retain();
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", new Object[]{this.topic, this.producerName, cnx.channel(), op.sequenceId});
                    }
                    cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
                    this.stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
                }
                cnx.ctx().flush();
                if (!this.changeToReadyState()) {
                    cnx.channel().close();
                    return;
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stripChecksum(OpSendMsg op) {
        int totalMsgBufSize = op.cmd.readableBytes();
        ByteBufPair msg = op.cmd;
        if (msg != null) {
            ByteBuf headerFrame = msg.getFirst();
            headerFrame.markReaderIndex();
            try {
                headerFrame.skipBytes(4);
                int cmdSize = (int)headerFrame.readUnsignedInt();
                headerFrame.skipBytes(cmdSize);
                if (!Commands.hasChecksum(headerFrame)) {
                    return;
                }
                int headerSize = 8 + cmdSize;
                int checksumSize = 6;
                int checksumMark = headerSize + checksumSize;
                int metaPayloadSize = totalMsgBufSize - checksumMark;
                int newTotalFrameSizeLength = 4 + cmdSize + metaPayloadSize;
                headerFrame.resetReaderIndex();
                int headerFrameSize = headerFrame.readableBytes();
                headerFrame.setInt(0, newTotalFrameSizeLength);
                ByteBuf metadata = headerFrame.slice(checksumMark, headerFrameSize - checksumMark);
                headerFrame.writerIndex(headerSize);
                metadata.readBytes(headerFrame, metadata.readableBytes());
                headerFrame.capacity(headerFrameSize - checksumSize);
            }
            finally {
                headerFrame.resetReaderIndex();
            }
        } else {
            log.warn("[{}] Failed while casting {} into ByteBufPair", (Object)this.producerName, (Object)op.cmd.getClass().getName());
        }
    }

    public int brokerChecksumSupportedVersion() {
        return PulsarApi.ProtocolVersion.v6.getNumber();
    }

    @Override
    String getHandlerName() {
        return this.producerName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(Timeout timeout) throws Exception {
        if (timeout.isCancelled()) {
            return;
        }
        ProducerImpl producerImpl = this;
        synchronized (producerImpl) {
            long timeToWaitMs;
            if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                return;
            }
            OpSendMsg firstMsg = (OpSendMsg)this.pendingMessages.peek();
            if (firstMsg == null) {
                timeToWaitMs = this.conf.getSendTimeoutMs();
            } else {
                long diff = firstMsg.createdAt + this.conf.getSendTimeoutMs() - System.currentTimeMillis();
                if (diff <= 0L) {
                    log.info("[{}] [{}] Message send timed out. Failing {} messages", new Object[]{this.topic, this.producerName, this.pendingMessages.size()});
                    PulsarClientException.TimeoutException te = new PulsarClientException.TimeoutException("Could not send message to broker within given timeout");
                    this.failPendingMessages(this.cnx(), (PulsarClientException)te);
                    this.stats.incrementSendFailed(this.pendingMessages.size());
                    timeToWaitMs = this.conf.getSendTimeoutMs();
                } else {
                    timeToWaitMs = diff;
                }
            }
            this.sendTimeout = this.client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
        }
    }

    private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
        if (cnx == null) {
            AtomicInteger releaseCount = new AtomicInteger();
            this.pendingMessages.forEach(op -> {
                releaseCount.addAndGet(op.numMessagesInBatch);
                try {
                    op.callback.sendComplete((Exception)((Object)ex));
                }
                catch (Throwable t) {
                    log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", new Object[]{this.topic, this.producerName, op.sequenceId, t});
                }
                ReferenceCountUtil.safeRelease(op.cmd);
                op.recycle();
            });
            this.semaphore.release(releaseCount.get());
            this.pendingMessages.clear();
            this.pendingCallbacks.clear();
            if (this.isBatchMessagingEnabled()) {
                this.failPendingBatchMessages(ex);
            }
        } else {
            cnx.ctx().channel().eventLoop().execute(() -> {
                ProducerImpl producerImpl = this;
                synchronized (producerImpl) {
                    this.failPendingMessages(null, ex);
                }
            });
        }
    }

    private void failPendingBatchMessages(PulsarClientException ex) {
        if (this.batchMessageContainer.isEmpty()) {
            return;
        }
        int numMessagesInBatch = this.batchMessageContainer.getNumMessagesInBatch();
        this.semaphore.release(numMessagesInBatch);
        this.batchMessageContainer.discard((Exception)((Object)ex));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> flushAsync() {
        CompletableFuture<MessageId> lastSendFuture;
        ProducerImpl producerImpl = this;
        synchronized (producerImpl) {
            if (this.isBatchMessagingEnabled()) {
                this.batchMessageAndSend();
            }
            lastSendFuture = this.lastSendFuture;
        }
        return lastSendFuture.thenApply(ignored -> null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void triggerFlush() {
        if (this.isBatchMessagingEnabled()) {
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                this.batchMessageAndSend();
            }
        }
    }

    private void batchMessageAndSend() {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Batching the messages from the batch container with {} messages", new Object[]{this.topic, this.producerName, this.batchMessageContainer.getNumMessagesInBatch()});
        }
        if (!this.batchMessageContainer.isEmpty()) {
            try {
                if (this.batchMessageContainer.isMultiBatches()) {
                    List<OpSendMsg> opSendMsgs = this.batchMessageContainer.createOpSendMsgs();
                    for (OpSendMsg opSendMsg : opSendMsgs) {
                        this.processOpSendMsg(opSendMsg);
                    }
                } else {
                    OpSendMsg opSendMsg = this.batchMessageContainer.createOpSendMsg();
                    if (opSendMsg != null) {
                        this.processOpSendMsg(opSendMsg);
                    }
                }
            }
            catch (PulsarClientException e) {
                Thread.currentThread().interrupt();
                this.semaphore.release(this.batchMessageContainer.getNumMessagesInBatch());
            }
            catch (Throwable t) {
                this.semaphore.release(this.batchMessageContainer.getNumMessagesInBatch());
                log.warn("[{}] [{}] error while create opSendMsg by batch message container -- {}", new Object[]{this.topic, this.producerName, t});
            }
        }
    }

    private void processOpSendMsg(OpSendMsg op) {
        block7: {
            try {
                this.batchMessageContainer.clear();
                this.pendingMessages.put(op);
                ClientCnx cnx = this.cnx();
                if (this.isConnected()) {
                    op.cmd.retain();
                    cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
                    this.stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
                } else if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", new Object[]{this.topic, this.producerName, op.sequenceId});
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                this.semaphore.release(op.numMessagesInBatch);
                if (op != null) {
                    op.callback.sendComplete((Exception)((Object)new PulsarClientException((Throwable)ie)));
                }
            }
            catch (Throwable t) {
                this.semaphore.release(op.numMessagesInBatch);
                log.warn("[{}] [{}] error while closing out batch -- {}", new Object[]{this.topic, this.producerName, t});
                if (op == null) break block7;
                op.callback.sendComplete((Exception)((Object)new PulsarClientException(t)));
            }
        }
    }

    public long getDelayInMillis() {
        OpSendMsg firstMsg = (OpSendMsg)this.pendingMessages.peek();
        if (firstMsg != null) {
            return System.currentTimeMillis() - firstMsg.createdAt;
        }
        return 0L;
    }

    public String getConnectionId() {
        return this.cnx() != null ? this.connectionId : null;
    }

    public String getConnectedSince() {
        return this.cnx() != null ? this.connectedSince : null;
    }

    public int getPendingQueueSize() {
        return this.pendingMessages.size();
    }

    public ProducerStatsRecorder getStats() {
        return this.stats;
    }

    public String getProducerName() {
        return this.producerName;
    }

    ClientCnx cnx() {
        return this.connectionHandler.cnx();
    }

    void resetBackoff() {
        this.connectionHandler.resetBackoff();
    }

    void connectionClosed(ClientCnx cnx) {
        this.connectionHandler.connectionClosed(cnx);
    }

    ClientCnx getClientCnx() {
        return this.connectionHandler.getClientCnx();
    }

    void setClientCnx(ClientCnx clientCnx) {
        this.connectionHandler.setClientCnx(clientCnx);
    }

    void reconnectLater(Throwable exception) {
        this.connectionHandler.reconnectLater(exception);
    }

    void grabCnx() {
        this.connectionHandler.grabCnx();
    }

    private static /* synthetic */ void lambda$closeAsync$2(PulsarClientException ex, OpSendMsg msg) {
        msg.callback.sendComplete((Exception)((Object)ex));
        msg.cmd.release();
        msg.recycle();
    }

    protected static final class OpSendMsg {
        MessageImpl<?> msg;
        List<MessageImpl<?>> msgs;
        ByteBufPair cmd;
        SendCallback callback;
        long sequenceId;
        long createdAt;
        long batchSizeByte = 0L;
        int numMessagesInBatch = 1;
        private final Recycler.Handle<OpSendMsg> recyclerHandle;
        private static final Recycler<OpSendMsg> RECYCLER = new Recycler<OpSendMsg>(){

            @Override
            protected OpSendMsg newObject(Recycler.Handle<OpSendMsg> handle) {
                return new OpSendMsg(handle);
            }
        };

        static OpSendMsg create(MessageImpl<?> msg, ByteBufPair cmd, long sequenceId, SendCallback callback) {
            OpSendMsg op = RECYCLER.get();
            op.msg = msg;
            op.cmd = cmd;
            op.callback = callback;
            op.sequenceId = sequenceId;
            op.createdAt = System.currentTimeMillis();
            return op;
        }

        static OpSendMsg create(List<MessageImpl<?>> msgs, ByteBufPair cmd, long sequenceId, SendCallback callback) {
            OpSendMsg op = RECYCLER.get();
            op.msgs = msgs;
            op.cmd = cmd;
            op.callback = callback;
            op.sequenceId = sequenceId;
            op.createdAt = System.currentTimeMillis();
            return op;
        }

        void recycle() {
            this.msg = null;
            this.msgs = null;
            this.cmd = null;
            this.callback = null;
            this.sequenceId = -1L;
            this.createdAt = -1L;
            this.recyclerHandle.recycle(this);
        }

        void setNumMessagesInBatch(int numMessagesInBatch) {
            this.numMessagesInBatch = numMessagesInBatch;
        }

        void setBatchSizeByte(long batchSizeByte) {
            this.batchSizeByte = batchSizeByte;
        }

        void setMessageId(long ledgerId, long entryId, int partitionIndex) {
            if (this.msg != null) {
                this.msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex));
            } else {
                for (int batchIndex = 0; batchIndex < this.msgs.size(); ++batchIndex) {
                    this.msgs.get(batchIndex).setMessageId(new BatchMessageIdImpl(ledgerId, entryId, partitionIndex, batchIndex));
                }
            }
        }

        private OpSendMsg(Recycler.Handle<OpSendMsg> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }
    }

    private static final class WriteInEventLoopCallback
    implements Runnable {
        private ProducerImpl<?> producer;
        private ByteBufPair cmd;
        private long sequenceId;
        private ClientCnx cnx;
        private final Recycler.Handle<WriteInEventLoopCallback> recyclerHandle;
        private static final Recycler<WriteInEventLoopCallback> RECYCLER = new Recycler<WriteInEventLoopCallback>(){

            @Override
            protected WriteInEventLoopCallback newObject(Recycler.Handle<WriteInEventLoopCallback> handle) {
                return new WriteInEventLoopCallback(handle);
            }
        };

        static WriteInEventLoopCallback create(ProducerImpl<?> producer, ClientCnx cnx, OpSendMsg op) {
            WriteInEventLoopCallback c = RECYCLER.get();
            c.producer = producer;
            c.cnx = cnx;
            c.sequenceId = op.sequenceId;
            c.cmd = op.cmd;
            return c;
        }

        @Override
        public void run() {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", new Object[]{this.producer.topic, ((ProducerImpl)this.producer).producerName, this.cnx, this.sequenceId});
            }
            try {
                this.cnx.ctx().writeAndFlush(this.cmd, this.cnx.ctx().voidPromise());
            }
            finally {
                this.recycle();
            }
        }

        private void recycle() {
            this.producer = null;
            this.cnx = null;
            this.cmd = null;
            this.sequenceId = -1L;
            this.recyclerHandle.recycle(this);
        }

        private WriteInEventLoopCallback(Recycler.Handle<WriteInEventLoopCallback> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }
    }
}

