/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.List;
import java.util.Optional;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.PulsarCommandSender;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarCommandSenderImpl
implements PulsarCommandSender {
    private static final Logger log = LoggerFactory.getLogger(PulsarCommandSenderImpl.class);
    private final BrokerInterceptor interceptor;
    private final ServerCnx cnx;

    public PulsarCommandSenderImpl(BrokerInterceptor interceptor, ServerCnx cnx) {
        this.interceptor = interceptor;
        this.cnx = cnx;
    }

    @Override
    public void sendPartitionMetadataResponse(ServerError error, String errorMsg, long requestId) {
        BaseCommand command = Commands.newPartitionMetadataResponseCommand((ServerError)error, (String)errorMsg, (long)requestId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendPartitionMetadataResponse(int partitions, long requestId) {
        BaseCommand command = Commands.newPartitionMetadataResponseCommand((int)partitions, (long)requestId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendSuccessResponse(long requestId) {
        BaseCommand command = Commands.newSuccessCommand((long)requestId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendErrorResponse(long requestId, ServerError error, String message) {
        BaseCommand command = Commands.newErrorCommand((long)requestId, (ServerError)error, (String)message);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion) {
        BaseCommand command = Commands.newProducerSuccessCommand((long)requestId, (String)producerName, (SchemaVersion)schemaVersion);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId, SchemaVersion schemaVersion, Optional<Long> topicEpoch, boolean isProducerReady) {
        BaseCommand command = Commands.newProducerSuccessCommand((long)requestId, (String)producerName, (long)lastSequenceId, (SchemaVersion)schemaVersion, topicEpoch, (boolean)isProducerReady);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, long ledgerId, long entryId) {
        BaseCommand command = Commands.newSendReceiptCommand((long)producerId, (long)sequenceId, (long)highestId, (long)ledgerId, (long)entryId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendSendError(long producerId, long sequenceId, ServerError error, String errorMsg) {
        BaseCommand command = Commands.newSendErrorCommand((long)producerId, (long)sequenceId, (ServerError)error, (String)errorMsg);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendGetTopicsOfNamespaceResponse(List<String> topics, long requestId) {
        BaseCommand command = Commands.newGetTopicsOfNamespaceResponseCommand(topics, (long)requestId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendGetSchemaResponse(long requestId, SchemaInfo schema, SchemaVersion version) {
        BaseCommand command = Commands.newGetSchemaResponseCommand((long)requestId, (SchemaInfo)schema, (SchemaVersion)version);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendGetSchemaErrorResponse(long requestId, ServerError error, String errorMessage) {
        BaseCommand command = Commands.newGetSchemaResponseErrorCommand((long)requestId, (ServerError)error, (String)errorMessage);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendGetOrCreateSchemaResponse(long requestId, SchemaVersion schemaVersion) {
        BaseCommand command = Commands.newGetOrCreateSchemaResponseCommand((long)requestId, (SchemaVersion)schemaVersion);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendGetOrCreateSchemaErrorResponse(long requestId, ServerError error, String errorMessage) {
        BaseCommand command = Commands.newGetOrCreateSchemaResponseErrorCommand((long)requestId, (ServerError)error, (String)errorMessage);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize) {
        BaseCommand command = Commands.newConnectedCommand((int)clientProtocolVersion, (int)maxMessageSize);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative, CommandLookupTopicResponse.LookupType response, long requestId, boolean proxyThroughServiceUrl) {
        BaseCommand command = Commands.newLookupResponseCommand((String)brokerServiceUrl, (String)brokerServiceUrlTls, (boolean)authoritative, (CommandLookupTopicResponse.LookupType)response, (long)requestId, (boolean)proxyThroughServiceUrl);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendLookupResponse(ServerError error, String errorMsg, long requestId) {
        BaseCommand command = Commands.newLookupErrorResponseCommand((ServerError)error, (String)errorMsg, (long)requestId);
        this.safeIntercept(command, this.cnx);
        ByteBuf outBuf = Commands.serializeWithSize((BaseCommand)command);
        this.cnx.ctx().writeAndFlush((Object)outBuf);
    }

    @Override
    public void sendActiveConsumerChange(long consumerId, boolean isActive) {
        if (!Commands.peerSupportsActiveConsumerListener((int)this.cnx.getRemoteEndpointProtocolVersion())) {
            return;
        }
        this.cnx.ctx().writeAndFlush((Object)Commands.newActiveConsumerChange((long)consumerId, (boolean)isActive), this.cnx.ctx().voidPromise());
    }

    @Override
    public void sendSuccess(long requestId) {
        this.cnx.ctx().writeAndFlush((Object)Commands.newSuccess((long)requestId));
    }

    @Override
    public void sendError(long requestId, ServerError error, String message) {
        this.cnx.ctx().writeAndFlush((Object)Commands.newError((long)requestId, (ServerError)error, (String)message));
    }

    @Override
    public void sendReachedEndOfTopic(long consumerId) {
        if (this.cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v9.getValue()) {
            log.info("[{}] Notifying consumer that end of topic has been reached", (Object)this);
            this.cnx.ctx().writeAndFlush((Object)Commands.newReachedEndOfTopic((long)consumerId));
        }
    }

    public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription, int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, RedeliveryTracker redeliveryTracker) {
        ChannelHandlerContext ctx = this.cnx.ctx();
        ChannelPromise writePromise = ctx.newPromise();
        ctx.channel().eventLoop().execute(() -> {
            for (int i = 0; i < entries.size(); ++i) {
                Entry entry = (Entry)entries.get(i);
                if (entry == null) continue;
                int batchSize = batchSizes.getBatchSize(i);
                if (batchSize > 1 && !this.cnx.isBatchMessageCompatibleVersion()) {
                    log.warn("[{}-{}] Consumer doesn't support batch messages -  consumerId {}, msg id {}-{}", new Object[]{topicName, subscription, consumerId, entry.getLedgerId(), entry.getEntryId()});
                    ctx.close();
                    entry.release();
                    continue;
                }
                ByteBuf metadataAndPayload = entry.getDataBuffer();
                metadataAndPayload.retain();
                Commands.skipBrokerEntryMetadataIfExist((ByteBuf)metadataAndPayload);
                if (this.cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getValue()) {
                    Commands.skipChecksumIfPresent((ByteBuf)metadataAndPayload);
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{} with batchSize {}", new Object[]{topicName, subscription, consumerId, entry.getLedgerId(), entry.getEntryId(), batchSize});
                }
                int redeliveryCount = 0;
                PositionImpl position = PositionImpl.get((long)entry.getLedgerId(), (long)entry.getEntryId());
                if (redeliveryTracker.contains((Position)position)) {
                    redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount((Position)position);
                }
                ctx.write((Object)this.cnx.newMessageAndIntercept(consumerId, entry.getLedgerId(), entry.getEntryId(), partitionIdx, redeliveryCount, metadataAndPayload, batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise());
                entry.release();
            }
            ctx.writeAndFlush((Object)Unpooled.EMPTY_BUFFER, writePromise);
            batchSizes.recyle();
            if (batchIndexesAcks != null) {
                batchIndexesAcks.recycle();
            }
        });
        return writePromise;
    }

    private void safeIntercept(BaseCommand command, ServerCnx cnx) {
        try {
            this.interceptor.onPulsarCommand(command, cnx);
        }
        catch (Exception e) {
            log.error("Failed to execute command {} on broker interceptor.", (Object)command.getType(), (Object)e);
        }
    }
}

