/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Producer {
    private final Topic topic;
    private final ServerCnx cnx;
    private final String producerName;
    private final long epoch;
    private final boolean userProvidedProducerName;
    private final long producerId;
    private final String appId;
    private Rate msgIn;
    private final Rate msgDrop;
    private AuthenticationDataSource authenticationData;
    private volatile long pendingPublishAcks = 0L;
    private static final AtomicLongFieldUpdater<Producer> pendingPublishAcksUpdater = AtomicLongFieldUpdater.newUpdater(Producer.class, "pendingPublishAcks");
    private boolean isClosed = false;
    private final CompletableFuture<Void> closeFuture;
    private final PublisherStats stats;
    private final boolean isRemote;
    private final String remoteCluster;
    private final boolean isNonPersistentTopic;
    private final boolean isEncrypted;
    private final Map<String, String> metadata;
    private final SchemaVersion schemaVersion;
    private static final Logger log = LoggerFactory.getLogger(Producer.class);

    public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch, boolean userProvidedProducerName) {
        this.topic = topic;
        this.cnx = cnx;
        this.producerId = producerId;
        this.producerName = (String)Preconditions.checkNotNull((Object)producerName);
        this.userProvidedProducerName = userProvidedProducerName;
        this.epoch = epoch;
        this.closeFuture = new CompletableFuture();
        this.appId = appId;
        this.authenticationData = cnx.authenticationData;
        this.msgIn = new Rate();
        this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
        this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;
        this.metadata = metadata != null ? metadata : Collections.emptyMap();
        this.stats = this.isNonPersistentTopic ? new NonPersistentPublisherStats() : new PublisherStats();
        this.stats.setAddress(cnx.clientAddress().toString());
        this.stats.setConnectedSince(DateFormatter.now());
        this.stats.setClientVersion(cnx.getClientVersion());
        this.stats.setProducerName(producerName);
        this.stats.producerId = producerId;
        this.stats.metadata = this.metadata;
        this.isRemote = producerName.startsWith(cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix());
        this.remoteCluster = this.isRemote ? producerName.split("\\.")[2].split("-->")[0] : null;
        this.isEncrypted = isEncrypted;
        this.schemaVersion = schemaVersion;
    }

    public int hashCode() {
        return Objects.hash(this.producerName);
    }

    public boolean equals(Object obj) {
        if (obj instanceof Producer) {
            Producer other = (Producer)obj;
            return Objects.equals(this.producerName, other.producerName) && Objects.equals(this.topic, other.topic);
        }
        return false;
    }

    public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
        this.beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
        this.publishMessageToTopic(headersAndPayload, sequenceId, batchSize);
    }

    public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId, ByteBuf headersAndPayload, long batchSize) {
        if (lowestSequenceId > highestSequenceId) {
            this.cnx.ctx().channel().eventLoop().execute(() -> {
                this.cnx.ctx().writeAndFlush((Object)Commands.newSendError((long)producerId, (long)highestSequenceId, (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)"Invalid lowest or highest sequence id"));
                this.cnx.completedSendOperation(this.isNonPersistentTopic, headersAndPayload.readableBytes());
            });
            return;
        }
        this.beforePublish(producerId, highestSequenceId, headersAndPayload, batchSize);
        this.publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize);
    }

    public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
        if (this.isClosed) {
            this.cnx.ctx().channel().eventLoop().execute(() -> {
                this.cnx.ctx().writeAndFlush((Object)Commands.newSendError((long)producerId, (long)sequenceId, (PulsarApi.ServerError)PulsarApi.ServerError.PersistenceError, (String)"Producer is closed"));
                this.cnx.completedSendOperation(this.isNonPersistentTopic, headersAndPayload.readableBytes());
            });
            return;
        }
        if (!this.verifyChecksum(headersAndPayload)) {
            this.cnx.ctx().channel().eventLoop().execute(() -> {
                this.cnx.ctx().writeAndFlush((Object)Commands.newSendError((long)producerId, (long)sequenceId, (PulsarApi.ServerError)PulsarApi.ServerError.ChecksumError, (String)"Checksum failed on the broker"));
                this.cnx.completedSendOperation(this.isNonPersistentTopic, headersAndPayload.readableBytes());
            });
            return;
        }
        if (this.topic.isEncryptionRequired()) {
            headersAndPayload.markReaderIndex();
            PulsarApi.MessageMetadata msgMetadata = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
            headersAndPayload.resetReaderIndex();
            if (msgMetadata.getEncryptionKeysCount() < 1) {
                log.warn("[{}] Messages must be encrypted", (Object)this.getTopic().getName());
                this.cnx.ctx().channel().eventLoop().execute(() -> {
                    this.cnx.ctx().writeAndFlush((Object)Commands.newSendError((long)producerId, (long)sequenceId, (PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)"Messages must be encrypted"));
                    this.cnx.completedSendOperation(this.isNonPersistentTopic, headersAndPayload.readableBytes());
                });
                return;
            }
        }
        this.startPublishOperation((int)batchSize, headersAndPayload.readableBytes());
    }

    private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize) {
        this.topic.publishMessage(headersAndPayload, MessagePublishContext.get(this, sequenceId, this.msgIn, headersAndPayload.readableBytes(), batchSize, System.nanoTime()));
    }

    private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize) {
        this.topic.publishMessage(headersAndPayload, MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, this.msgIn, headersAndPayload.readableBytes(), batchSize, System.nanoTime()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean verifyChecksum(ByteBuf headersAndPayload) {
        if (Commands.hasChecksum((ByteBuf)headersAndPayload)) {
            int readerIndex = headersAndPayload.readerIndex();
            try {
                int checksum = Commands.readChecksum((ByteBuf)headersAndPayload);
                long computedChecksum = Crc32cIntChecksum.computeChecksum((ByteBuf)headersAndPayload);
                if ((long)checksum == computedChecksum) {
                    boolean bl = true;
                    return bl;
                }
                log.error("[{}] [{}] Failed to verify checksum", (Object)this.topic, (Object)this.producerName);
                boolean bl = false;
                return bl;
            }
            finally {
                headersAndPayload.readerIndex(readerIndex);
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Payload does not have checksum to verify", (Object)this.topic, (Object)this.producerName);
        }
        return true;
    }

    private void startPublishOperation(int batchSize, long msgSize) {
        pendingPublishAcksUpdater.lazySet(this, this.pendingPublishAcks + 1L);
        this.getTopic().incrementPublishCount(batchSize, msgSize);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publishOperationCompleted() {
        long newPendingPublishAcks = this.pendingPublishAcks - 1L;
        pendingPublishAcksUpdater.lazySet(this, newPendingPublishAcks);
        if (newPendingPublishAcks == 0L && !this.closeFuture.isDone()) {
            Producer producer = this;
            synchronized (producer) {
                if (this.isClosed && !this.closeFuture.isDone()) {
                    this.closeNow(true);
                }
            }
        }
    }

    public void recordMessageDrop(int batchSize) {
        if (this.isNonPersistentTopic) {
            this.msgDrop.recordEvent((long)batchSize);
        }
    }

    public long getLastSequenceId() {
        if (this.isNonPersistentTopic) {
            return -1L;
        }
        return ((PersistentTopic)this.topic).getLastPublishedSequenceId(this.producerName);
    }

    public ServerCnx getCnx() {
        return this.cnx;
    }

    public Topic getTopic() {
        return this.topic;
    }

    public String getProducerName() {
        return this.producerName;
    }

    public long getProducerId() {
        return this.producerId;
    }

    public Map<String, String> getMetadata() {
        return this.metadata;
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("topic", (Object)this.topic).add("client", (Object)this.cnx.clientAddress()).add("producerName", (Object)this.producerName).add("producerId", this.producerId).toString();
    }

    public synchronized CompletableFuture<Void> close(boolean removeFromTopic) {
        if (log.isDebugEnabled()) {
            log.debug("Closing producer {} -- isClosed={}", (Object)this, (Object)this.isClosed);
        }
        if (!this.isClosed) {
            this.isClosed = true;
            if (log.isDebugEnabled()) {
                log.debug("Trying to close producer {} -- cnxIsActive: {} -- pendingPublishAcks: {}", new Object[]{this, this.cnx.isActive(), this.pendingPublishAcks});
            }
            if (!this.cnx.isActive() || this.pendingPublishAcks == 0L) {
                this.closeNow(removeFromTopic);
            }
        }
        return this.closeFuture;
    }

    void closeNow(boolean removeFromTopic) {
        if (removeFromTopic) {
            this.topic.removeProducer(this);
        }
        this.cnx.removedProducer(this);
        if (log.isDebugEnabled()) {
            log.debug("Removed producer: {}", (Object)this);
        }
        this.closeFuture.complete(null);
    }

    public CompletableFuture<Void> disconnect() {
        if (!this.closeFuture.isDone()) {
            log.info("Disconnecting producer: {}", (Object)this);
            this.cnx.ctx().executor().execute(() -> {
                this.cnx.closeProducer(this);
                this.closeNow(true);
            });
        }
        return this.closeFuture;
    }

    public void updateRates() {
        this.msgIn.calculateRate();
        this.stats.msgRateIn = this.msgIn.getRate();
        this.stats.msgThroughputIn = this.msgIn.getValueRate();
        this.stats.averageMsgSize = this.msgIn.getAverageValue();
        if (this.isNonPersistentTopic) {
            this.msgDrop.calculateRate();
            ((NonPersistentPublisherStats)this.stats).msgDropRate = this.msgDrop.getRate();
        }
    }

    public boolean isRemote() {
        return this.isRemote;
    }

    public String getRemoteCluster() {
        return this.remoteCluster;
    }

    public PublisherStats getStats() {
        return this.stats;
    }

    public boolean isNonPersistentTopic() {
        return this.isNonPersistentTopic;
    }

    public long getEpoch() {
        return this.epoch;
    }

    public boolean isUserProvidedProducerName() {
        return this.userProvidedProducerName;
    }

    @VisibleForTesting
    long getPendingPublishAcks() {
        return this.pendingPublishAcks;
    }

    public void checkPermissions() {
        TopicName topicName = TopicName.get((String)this.topic.getName());
        if (this.cnx.getBrokerService().getAuthorizationService() != null) {
            try {
                if (this.cnx.getBrokerService().getAuthorizationService().canProduce(topicName, this.appId, this.authenticationData)) {
                    return;
                }
            }
            catch (Exception e) {
                log.warn("[{}] Get unexpected error while autorizing [{}]  {}", new Object[]{this.appId, this.topic.getName(), e.getMessage(), e});
            }
            log.info("[{}] is not allowed to produce on topic [{}] anymore", (Object)this.appId, (Object)this.topic.getName());
            this.disconnect();
        }
    }

    public void checkEncryption() {
        if (this.topic.isEncryptionRequired() && !this.isEncrypted) {
            log.info("[{}] [{}] Unencrypted producer is not allowed to produce on topic [{}] anymore", new Object[]{this.producerId, this.producerName, this.topic.getName()});
            this.disconnect();
        }
    }

    public SchemaVersion getSchemaVersion() {
        return this.schemaVersion;
    }

    private static final class MessagePublishContext
    implements Topic.PublishContext,
    Runnable {
        private Producer producer;
        private long sequenceId;
        private long ledgerId;
        private long entryId;
        private Rate rateIn;
        private int msgSize;
        private long batchSize;
        private long startTimeNs;
        private String originalProducerName;
        private long originalSequenceId;
        private long highestSequenceId;
        private long originalHighestSequenceId;
        private final Recycler.Handle<MessagePublishContext> recyclerHandle;
        private static final Recycler<MessagePublishContext> RECYCLER = new Recycler<MessagePublishContext>(){

            protected MessagePublishContext newObject(Recycler.Handle<MessagePublishContext> handle) {
                return new MessagePublishContext(handle);
            }
        };

        @Override
        public String getProducerName() {
            return this.producer.getProducerName();
        }

        @Override
        public long getSequenceId() {
            return this.sequenceId;
        }

        @Override
        public long getHighestSequenceId() {
            return this.highestSequenceId;
        }

        @Override
        public void setOriginalProducerName(String originalProducerName) {
            this.originalProducerName = originalProducerName;
        }

        @Override
        public void setOriginalSequenceId(long originalSequenceId) {
            this.originalSequenceId = originalSequenceId;
        }

        @Override
        public String getOriginalProducerName() {
            return this.originalProducerName;
        }

        @Override
        public long getOriginalSequenceId() {
            return this.originalSequenceId;
        }

        @Override
        public void setOriginalHighestSequenceId(long originalHighestSequenceId) {
            this.originalHighestSequenceId = originalHighestSequenceId;
        }

        @Override
        public long getOriginalHighestSequenceId() {
            return this.originalHighestSequenceId;
        }

        @Override
        public void completed(Exception exception, long ledgerId, long entryId) {
            if (exception != null) {
                PulsarApi.ServerError serverError = exception instanceof BrokerServiceException.TopicTerminatedException ? PulsarApi.ServerError.TopicTerminatedError : PulsarApi.ServerError.PersistenceError;
                this.producer.cnx.ctx().channel().eventLoop().execute(() -> {
                    if (!(exception instanceof BrokerServiceException.TopicClosedException)) {
                        long callBackSequenceId = Math.max(this.highestSequenceId, this.sequenceId);
                        this.producer.cnx.ctx().writeAndFlush((Object)Commands.newSendError((long)this.producer.producerId, (long)callBackSequenceId, (PulsarApi.ServerError)serverError, (String)exception.getMessage()));
                    }
                    this.producer.cnx.completedSendOperation(this.producer.isNonPersistentTopic, this.msgSize);
                    this.producer.publishOperationCompleted();
                    this.recycle();
                });
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] [{}] triggered send callback. cnx {}, sequenceId {}", new Object[]{this.producer.topic, this.producer.producerName, this.producer.producerId, this.producer.cnx.clientAddress(), this.sequenceId});
                }
                this.ledgerId = ledgerId;
                this.entryId = entryId;
                this.producer.cnx.ctx().channel().eventLoop().execute((Runnable)this);
            }
        }

        @Override
        public void run() {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Persisted message. cnx {}, sequenceId {}", new Object[]{this.producer.topic, this.producer.producerName, this.producer.producerId, this.producer.cnx, this.sequenceId});
            }
            this.rateIn.recordMultipleEvents(this.batchSize, (long)this.msgSize);
            this.producer.topic.recordAddLatency(System.nanoTime() - this.startTimeNs, TimeUnit.NANOSECONDS);
            this.producer.cnx.ctx().writeAndFlush((Object)Commands.newSendReceipt((long)this.producer.producerId, (long)this.sequenceId, (long)this.highestSequenceId, (long)this.ledgerId, (long)this.entryId), this.producer.cnx.ctx().voidPromise());
            this.producer.cnx.completedSendOperation(this.producer.isNonPersistentTopic, this.msgSize);
            this.producer.publishOperationCompleted();
            this.recycle();
        }

        static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize, long batchSize, long startTimeNs) {
            MessagePublishContext callback = (MessagePublishContext)RECYCLER.get();
            callback.producer = producer;
            callback.sequenceId = sequenceId;
            callback.rateIn = rateIn;
            callback.msgSize = msgSize;
            callback.batchSize = batchSize;
            callback.originalProducerName = null;
            callback.originalSequenceId = -1L;
            callback.startTimeNs = startTimeNs;
            return callback;
        }

        static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, int msgSize, long batchSize, long startTimeNs) {
            MessagePublishContext callback = (MessagePublishContext)RECYCLER.get();
            callback.producer = producer;
            callback.sequenceId = lowestSequenceId;
            callback.highestSequenceId = highestSequenceId;
            callback.rateIn = rateIn;
            callback.msgSize = msgSize;
            callback.batchSize = batchSize;
            callback.originalProducerName = null;
            callback.originalSequenceId = -1L;
            callback.startTimeNs = startTimeNs;
            return callback;
        }

        private MessagePublishContext(Recycler.Handle<MessagePublishContext> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        public void recycle() {
            this.producer = null;
            this.sequenceId = -1L;
            this.highestSequenceId = -1L;
            this.originalSequenceId = -1L;
            this.originalHighestSequenceId = -1L;
            this.rateIn = null;
            this.msgSize = 0;
            this.ledgerId = -1L;
            this.entryId = -1L;
            this.batchSize = 0L;
            this.startTimeNs = -1L;
            this.recyclerHandle.recycle((Object)this);
        }
    }
}

