/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.rest;

import com.fasterxml.jackson.databind.ObjectReader;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.rest.RestMessagePublishContext;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroBaseStructSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroWriter;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonWriter;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerAcks;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.data.ProducerMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicsBase
extends PersistentTopicsBase {
    private static final Logger log = LoggerFactory.getLogger(TopicsBase.class);
    private static String defaultProducerName = "RestProducer";
    private static final ObjectReader SCHEMA_INFO_READER = ObjectMapperFactory.getMapper().reader().forType(SchemaInfoImpl.class);

    protected void publishMessages(AsyncResponse asyncResponse, ProducerMessages request, boolean authoritative) {
        String topic = this.topicName.getPartitionedTopicName();
        try {
            if (this.pulsar().getBrokerService().getOwningTopics().containsKey((Object)topic) || !this.findOwnerBrokerForTopic(authoritative, asyncResponse)) {
                ((CompletableFuture)this.addOrGetSchemaForTopic(this.getSchemaData(request.getKeySchema(), request.getValueSchema()), request.getSchemaVersion() == -1L ? null : new LongSchemaVersion(request.getSchemaVersion())).thenAccept(schemaMeta -> {
                    if (schemaMeta.getLeft() != null && schemaMeta.getRight() != null) {
                        this.internalPublishMessages(this.topicName, request, ((ConcurrentOpenHashSet)this.pulsar().getBrokerService().getOwningTopics().get((Object)topic)).values(), asyncResponse, AutoConsumeSchema.getSchema((SchemaInfo)((SchemaData)schemaMeta.getLeft()).toSchemaInfo()), (SchemaVersion)schemaMeta.getRight());
                    } else {
                        asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Fail to add or retrieve schema.")));
                    }
                })).exceptionally(e -> {
                    if (log.isDebugEnabled()) {
                        log.debug("Fail to publish message: " + e.getMessage());
                    }
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Fail to publish message:" + e.getMessage())));
                    return null;
                });
            }
        }
        catch (Exception e2) {
            asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Fail to publish message: " + e2.getMessage())));
        }
    }

    protected void publishMessagesToPartition(AsyncResponse asyncResponse, ProducerMessages request, boolean authoritative, int partition) {
        if (this.topicName.isPartitioned()) {
            asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.BAD_REQUEST, "Topic name can't contain '-partition-' suffix.")));
        }
        String topic = this.topicName.getPartitionedTopicName();
        try {
            if (this.pulsar().getBrokerService().getOwningTopics().containsKey((Object)topic) && ((ConcurrentOpenHashSet)this.pulsar().getBrokerService().getOwningTopics().get((Object)topic)).contains((Object)partition) || !this.findOwnerBrokerForTopic(authoritative, asyncResponse)) {
                ((CompletableFuture)this.addOrGetSchemaForTopic(this.getSchemaData(request.getKeySchema(), request.getValueSchema()), request.getSchemaVersion() == -1L ? null : new LongSchemaVersion(request.getSchemaVersion())).thenAccept(schemaMeta -> {
                    if (schemaMeta.getLeft() != null && schemaMeta.getRight() != null) {
                        this.internalPublishMessagesToPartition(this.topicName, request, partition, asyncResponse, AutoConsumeSchema.getSchema((SchemaInfo)((SchemaData)schemaMeta.getLeft()).toSchemaInfo()), (SchemaVersion)schemaMeta.getRight());
                    } else {
                        asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Fail to add or retrieve schema.")));
                    }
                })).exceptionally(e -> {
                    if (log.isDebugEnabled()) {
                        log.debug("Fail to publish message to single partition: " + e.getLocalizedMessage());
                    }
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Fail to publish messageto single partition: " + e.getMessage())));
                    return null;
                });
            }
        }
        catch (Exception e2) {
            asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Fail to publish message: " + e2.getMessage())));
        }
    }

    private void internalPublishMessagesToPartition(TopicName topicName, ProducerMessages request, int partition, AsyncResponse asyncResponse, Schema schema, SchemaVersion schemaVersion) {
        try {
            String producerName = null == request.getProducerName() || request.getProducerName().isEmpty() ? defaultProducerName : request.getProducerName();
            List<Message> messages = this.buildMessage(request, schema, producerName, topicName);
            ArrayList<CompletableFuture<PositionImpl>> publishResults = new ArrayList<CompletableFuture<PositionImpl>>();
            ArrayList<ProducerAck> produceMessageResults = new ArrayList<ProducerAck>();
            for (int index = 0; index < messages.size(); ++index) {
                ProducerAck produceMessageResult = new ProducerAck();
                produceMessageResult.setMessageId("" + partition);
                produceMessageResults.add(produceMessageResult);
                publishResults.add(this.publishSingleMessageToPartition(topicName.getPartition(partition).toString(), messages.get(index)));
            }
            ((CompletableFuture)FutureUtil.waitForAll(publishResults).thenRun(() -> {
                this.processPublishMessageResults(produceMessageResults, publishResults);
                asyncResponse.resume((Object)Response.ok().entity((Object)new ProducerAcks(produceMessageResults, ((LongSchemaVersion)schemaVersion).getVersion())).build());
            })).exceptionally(e -> {
                this.processPublishMessageResults(produceMessageResults, publishResults);
                asyncResponse.resume((Object)Response.ok().entity((Object)new ProducerAcks(produceMessageResults, ((LongSchemaVersion)schemaVersion).getVersion())).build());
                return null;
            });
        }
        catch (Exception e2) {
            if (log.isDebugEnabled()) {
                log.debug("Fail publish messages to single partition with rest produce message request for topic  {}: {} ", (Object)topicName, (Object)e2.getCause());
            }
            asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.INTERNAL_SERVER_ERROR, e2.getMessage())));
        }
    }

    private void internalPublishMessages(TopicName topicName, ProducerMessages request, List<Integer> partitionIndexes, AsyncResponse asyncResponse, Schema schema, SchemaVersion schemaVersion) {
        if (partitionIndexes.size() < 1) {
            asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.INTERNAL_SERVER_ERROR, new BrokerServiceException.TopicNotFoundException("Topic not owned by current broker."))));
        }
        try {
            String producerName = null == request.getProducerName() || request.getProducerName().isEmpty() ? defaultProducerName : request.getProducerName();
            List<Message> messages = this.buildMessage(request, schema, producerName, topicName);
            ArrayList<CompletableFuture<PositionImpl>> publishResults = new ArrayList<CompletableFuture<PositionImpl>>();
            ArrayList<ProducerAck> produceMessageResults = new ArrayList<ProducerAck>();
            for (int index = 0; index < messages.size(); ++index) {
                ProducerAck produceMessageResult = new ProducerAck();
                produceMessageResult.setMessageId(String.valueOf(partitionIndexes.get(index % partitionIndexes.size())));
                produceMessageResults.add(produceMessageResult);
                publishResults.add(this.publishSingleMessageToPartition(topicName.getPartition(partitionIndexes.get(index % partitionIndexes.size()).intValue()).toString(), messages.get(index)));
            }
            ((CompletableFuture)FutureUtil.waitForAll(publishResults).thenRun(() -> {
                this.processPublishMessageResults(produceMessageResults, publishResults);
                asyncResponse.resume((Object)Response.ok().entity((Object)new ProducerAcks(produceMessageResults, ((LongSchemaVersion)schemaVersion).getVersion())).build());
            })).exceptionally(e -> {
                this.processPublishMessageResults(produceMessageResults, publishResults);
                asyncResponse.resume((Object)Response.ok().entity((Object)new ProducerAcks(produceMessageResults, ((LongSchemaVersion)schemaVersion).getVersion())).build());
                return null;
            });
        }
        catch (Exception e2) {
            if (log.isDebugEnabled()) {
                log.debug("Fail to publish messages with rest produce message request for topic  {}: {} ", (Object)topicName, (Object)e2.getCause());
            }
            asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.INTERNAL_SERVER_ERROR, e2.getMessage())));
        }
    }

    private CompletableFuture<PositionImpl> publishSingleMessageToPartition(String topic, Message message) {
        CompletableFuture<PositionImpl> publishResult = new CompletableFuture<PositionImpl>();
        this.pulsar().getBrokerService().getTopic(topic, false).thenAccept(t -> {
            if (!t.isPresent()) {
                publishResult.completeExceptionally(new BrokerServiceException.TopicNotFoundException("Topic not owned by current broker."));
                TopicName topicName = TopicName.get((String)topic);
                ((ConcurrentOpenHashSet)this.pulsar().getBrokerService().getOwningTopics().get((Object)topicName.getPartitionedTopicName())).remove((Object)topicName.getPartitionIndex());
            } else {
                try {
                    ((Topic)t.get()).publishMessage(this.messageToByteBuf(message), RestMessagePublishContext.get(publishResult, (Topic)t.get(), System.nanoTime()));
                }
                catch (Exception e) {
                    if (log.isDebugEnabled()) {
                        log.debug("Fail to publish single messages to topic  {}: {} ", (Object)this.topicName, (Object)e.getCause());
                    }
                    publishResult.completeExceptionally(e);
                }
            }
        });
        return publishResult;
    }

    private void processPublishMessageResults(List<ProducerAck> produceMessageResults, List<CompletableFuture<PositionImpl>> publishResults) {
        for (int index = 0; index < publishResults.size(); ++index) {
            try {
                PositionImpl position = publishResults.get(index).get();
                MessageIdImpl messageId = new MessageIdImpl(position.getLedgerId(), position.getEntryId(), Integer.parseInt(produceMessageResults.get(index).getMessageId()));
                produceMessageResults.get(index).setMessageId(messageId.toString());
                continue;
            }
            catch (Exception e) {
                if (log.isDebugEnabled()) {
                    log.debug("Fail publish [{}] message with rest produce message request for topic  {}", (Object)index, (Object)this.topicName);
                }
                if (e instanceof BrokerServiceException.TopicNotFoundException) {
                    this.pulsar().getBrokerService().getOwningTopics().remove((Object)this.topicName.getPartitionedTopicName());
                }
                this.extractException(e, produceMessageResults.get(index));
            }
        }
    }

    private void extractException(Exception e, ProducerAck produceMessageResult) {
        if (!(e instanceof BrokerServiceException.TopicFencedException) || !(e instanceof ManagedLedgerException)) {
            produceMessageResult.setErrorCode(2);
        } else {
            produceMessageResult.setErrorCode(1);
        }
        produceMessageResult.setErrorMsg(e.getMessage());
    }

    private boolean findOwnerBrokerForTopic(boolean authoritative, AsyncResponse asyncResponse) {
        PartitionedTopicMetadata metadata = this.internalGetPartitionedMetadata(authoritative, false);
        List<String> redirectAddresses = Collections.synchronizedList(new ArrayList());
        CompletableFuture future = new CompletableFuture();
        ArrayList<CompletableFuture<Void>> lookupFutures = new ArrayList<CompletableFuture<Void>>();
        if (!this.topicName.isPartitioned() && metadata.partitions > 1) {
            for (int index = 0; index < metadata.partitions; ++index) {
                lookupFutures.add(this.lookUpBrokerForTopic(this.topicName.getPartition(index), authoritative, redirectAddresses));
            }
        } else {
            lookupFutures.add(this.lookUpBrokerForTopic(this.topicName, authoritative, redirectAddresses));
        }
        ((CompletableFuture)FutureUtil.waitForAll(lookupFutures).thenRun(() -> this.processLookUpResult(redirectAddresses, asyncResponse, future))).exceptionally(e -> {
            this.processLookUpResult(redirectAddresses, asyncResponse, future);
            return null;
        });
        try {
            return (Boolean)future.get();
        }
        catch (Exception e2) {
            if (log.isDebugEnabled()) {
                log.debug("Fail to lookup topic for rest produce message request for topic {}.", (Object)this.topicName.toString());
            }
            if (!asyncResponse.isDone()) {
                asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Internal error: " + e2.getMessage())));
            }
            return true;
        }
    }

    private void processLookUpResult(List<String> redirectAddresses, AsyncResponse asyncResponse, CompletableFuture<Boolean> future) {
        if (!this.pulsar().getBrokerService().getOwningTopics().containsKey((Object)this.topicName.getPartitionedTopicName())) {
            if (redirectAddresses.isEmpty()) {
                asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Can't find owner of given topic.")));
                future.complete(true);
            } else {
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("Redirect rest produce request for topic {} from {} to {}.", new Object[]{this.topicName, this.pulsar().getWebServiceAddress(), redirectAddresses.get(0)});
                    }
                    URL redirectAddress = new URL(redirectAddresses.get(0));
                    URI redirectURI = UriBuilder.fromUri((URI)this.uri.getRequestUri()).host(redirectAddress.getHost()).port(redirectAddress.getPort()).build(new Object[0]);
                    asyncResponse.resume((Object)Response.temporaryRedirect((URI)redirectURI).build());
                    future.complete(true);
                }
                catch (Exception e) {
                    if (log.isDebugEnabled()) {
                        log.error("Error in preparing redirect url with rest produce message request for topic  {}: {}", new Object[]{this.topicName, e.getMessage(), e});
                    }
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Fail to redirect client request.")));
                    future.complete(true);
                }
            }
        } else {
            future.complete(false);
        }
    }

    private CompletableFuture<Void> lookUpBrokerForTopic(TopicName partitionedTopicName, boolean authoritative, List<String> redirectAddresses) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (!this.pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
            if (log.isDebugEnabled()) {
                log.debug("Too many concurrent lookup request.");
            }
            future.completeExceptionally(new BrokerServiceException.TooManyRequestsException("Too many concurrent lookup request"));
            return future;
        }
        CompletableFuture<Optional<LookupResult>> lookupFuture = this.pulsar().getNamespaceService().getBrokerServiceUrlAsync(partitionedTopicName, LookupOptions.builder().authoritative(authoritative).loadTopicsInBundle(false).build());
        ((CompletableFuture)lookupFuture.thenAccept(optionalResult -> {
            if (optionalResult == null || !optionalResult.isPresent()) {
                if (log.isDebugEnabled()) {
                    log.debug("Fail to lookup topic for rest produce message request for topic {}.", (Object)partitionedTopicName);
                }
                this.completeLookup((Pair<List<String>, Boolean>)Pair.of(Collections.emptyList(), (Object)false), redirectAddresses, future);
                return;
            }
            LookupResult result = (LookupResult)optionalResult.get();
            String httpUrl = result.getLookupData().getHttpUrl();
            String httpUrlTls = result.getLookupData().getHttpUrlTls();
            if (StringUtils.isNotBlank((CharSequence)httpUrl) && httpUrl.equals(this.pulsar().getWebServiceAddress()) || StringUtils.isNotBlank((CharSequence)httpUrlTls) && httpUrlTls.equals(this.pulsar().getWebServiceAddressTls())) {
                if (log.isDebugEnabled()) {
                    log.debug("Complete topic look up for rest produce message request for topic {}, current broker is owner broker: {}", (Object)partitionedTopicName, (Object)result.getLookupData());
                }
                ((ConcurrentOpenHashSet)this.pulsar().getBrokerService().getOwningTopics().computeIfAbsent((Object)partitionedTopicName.getPartitionedTopicName(), key -> ConcurrentOpenHashSet.newBuilder().build())).add((Object)partitionedTopicName.getPartitionIndex());
                this.completeLookup((Pair<List<String>, Boolean>)Pair.of(Collections.emptyList(), (Object)false), redirectAddresses, future);
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Complete topic look up for rest produce message request for topic {}, current broker is not owner broker: {}", (Object)partitionedTopicName, (Object)result.getLookupData());
                }
                if (result.isRedirect()) {
                    this.completeLookup((Pair<List<String>, Boolean>)Pair.of(Arrays.asList(httpUrl, httpUrlTls), (Object)false), redirectAddresses, future);
                } else {
                    this.completeLookup((Pair<List<String>, Boolean>)Pair.of(Arrays.asList(httpUrl, httpUrlTls), (Object)true), redirectAddresses, future);
                }
            }
        })).exceptionally(exception -> {
            if (log.isDebugEnabled()) {
                log.debug("Fail to lookup broker with rest produce message request for topic {}: {}", (Object)partitionedTopicName, (Object)exception.getMessage());
            }
            this.completeLookup((Pair<List<String>, Boolean>)Pair.of(Collections.emptyList(), (Object)false), redirectAddresses, future);
            return null;
        });
        return future;
    }

    private CompletableFuture<Pair<SchemaData, SchemaVersion>> addOrGetSchemaForTopic(SchemaData schemaData, LongSchemaVersion schemaVersion) {
        CompletableFuture<Pair<SchemaData, SchemaVersion>> future = new CompletableFuture<Pair<SchemaData, SchemaVersion>>();
        if (null != schemaVersion) {
            String id = TopicName.get((String)this.topicName.getPartitionedTopicName()).getSchemaName();
            try {
                SchemaRegistry.SchemaAndMetadata schemaAndMetadata = this.pulsar().getSchemaRegistryService().getSchema(id, (SchemaVersion)schemaVersion).get();
                future.complete((Pair<SchemaData, SchemaVersion>)Pair.of((Object)schemaAndMetadata.schema, (Object)schemaAndMetadata.version));
            }
            catch (Exception e) {
                if (log.isDebugEnabled()) {
                    log.debug("Fail to retrieve schema of version {} for topic {}: {}", new Object[]{schemaVersion.getVersion(), this.topicName, e.getMessage()});
                }
                future.completeExceptionally(e);
            }
        } else if (null != schemaData) {
            try {
                SchemaVersion sv = this.addSchema(schemaData).get();
                future.complete((Pair<SchemaData, SchemaVersion>)Pair.of((Object)schemaData, (Object)sv));
            }
            catch (Exception e) {
                if (log.isDebugEnabled()) {
                    log.debug("Fail to add schema {} for topic {}: {}", new Object[]{new String(schemaData.toSchemaInfo().getSchema()), this.topicName, e.getMessage()});
                }
                future.completeExceptionally(e);
            }
        } else {
            future.complete(Pair.of(null, null));
        }
        return future;
    }

    private CompletableFuture<SchemaVersion> addSchema(SchemaData schemaData) {
        List partitions = ((ConcurrentOpenHashSet)this.pulsar().getBrokerService().getOwningTopics().get((Object)this.topicName.getPartitionedTopicName())).values();
        CompletableFuture<SchemaVersion> result = new CompletableFuture<SchemaVersion>();
        for (int index = 0; index < partitions.size(); ++index) {
            CompletableFuture future = new CompletableFuture();
            String topicPartitionName = this.topicName.getPartition(((Integer)partitions.get(index)).intValue()).toString();
            this.pulsar().getBrokerService().getTopic(topicPartitionName, false).thenAccept(topic -> {
                if (!topic.isPresent()) {
                    future.completeExceptionally(new BrokerServiceException.TopicNotFoundException("Topic " + topicPartitionName + " not found"));
                } else {
                    ((CompletableFuture)((Topic)topic.get()).addSchema(schemaData).thenAccept(schemaVersion -> future.complete(schemaVersion))).exceptionally(exception -> {
                        future.completeExceptionally((Throwable)exception);
                        return null;
                    });
                }
            });
            try {
                result.complete((SchemaVersion)future.get());
                break;
            }
            catch (Exception e) {
                if (!log.isDebugEnabled()) continue;
                log.debug("Fail to add schema to topic " + this.topicName.getPartitionedTopicName() + " for partition " + String.valueOf(partitions.get(index)) + " for REST produce request.");
                continue;
            }
        }
        if (!result.isDone()) {
            result.completeExceptionally(new SchemaException("Unable to add schema " + String.valueOf(schemaData) + " to topic " + this.topicName.getPartitionedTopicName()));
        }
        return result;
    }

    private SchemaData getSchemaData(String keySchema, String valueSchema) {
        try {
            SchemaInfoImpl valueSchemaInfo;
            SchemaInfoImpl schemaInfoImpl = valueSchemaInfo = valueSchema == null || valueSchema.isEmpty() ? (SchemaInfoImpl)StringSchema.utf8().getSchemaInfo() : (SchemaInfoImpl)SCHEMA_INFO_READER.readValue(valueSchema);
            if (null == valueSchemaInfo.getName()) {
                valueSchemaInfo.setName(valueSchemaInfo.getType().toString());
            }
            if (keySchema == null || keySchema.isEmpty()) {
                return SchemaData.builder().data(valueSchemaInfo.getSchema()).isDeleted(false).user("Rest Producer").timestamp(System.currentTimeMillis()).type(valueSchemaInfo.getType()).props(valueSchemaInfo.getProperties()).build();
            }
            SchemaInfoImpl keySchemaInfo = (SchemaInfoImpl)SCHEMA_INFO_READER.readValue(keySchema);
            if (null == keySchemaInfo.getName()) {
                keySchemaInfo.setName(keySchemaInfo.getType().toString());
            }
            SchemaInfo schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo((String)("KVSchema-" + this.topicName.getPartitionedTopicName()), (SchemaInfo)keySchemaInfo, (SchemaInfo)valueSchemaInfo, (KeyValueEncodingType)KeyValueEncodingType.SEPARATED);
            return SchemaData.builder().data(schemaInfo.getSchema()).isDeleted(false).user("Rest Producer").timestamp(System.currentTimeMillis()).type(schemaInfo.getType()).props(schemaInfo.getProperties()).build();
        }
        catch (IOException e) {
            if (log.isDebugEnabled()) {
                log.debug("Fail to parse schema info for rest produce request with key schema {} and value schema {}", (Object)keySchema, (Object)valueSchema);
            }
            return null;
        }
    }

    public ByteBuf messageToByteBuf(Message message) {
        this.checkArgument(message instanceof MessageImpl, "Message must be type of MessageImpl.");
        MessageImpl msg = (MessageImpl)message;
        MessageMetadata messageMetadata = msg.getMessageBuilder();
        ByteBuf payload = msg.getDataBuffer();
        messageMetadata.setCompression(CompressionCodecProvider.convertToWireProtocol((CompressionType)CompressionType.NONE));
        messageMetadata.setUncompressedSize(payload.readableBytes());
        return Commands.serializeMetadataAndPayload((Commands.ChecksumType)Commands.ChecksumType.Crc32c, (MessageMetadata)messageMetadata, (ByteBuf)payload);
    }

    private List<Message> buildMessage(ProducerMessages producerMessages, Schema schema, String producerName, TopicName topicName) {
        ArrayList<Message> pulsarMessages = new ArrayList<Message>();
        List messages = producerMessages.getMessages();
        for (ProducerMessage message : messages) {
            KeyValueSchemaImpl kvSchema;
            MessageMetadata messageMetadata = new MessageMetadata();
            messageMetadata.setProducerName(producerName);
            messageMetadata.setPublishTime(System.currentTimeMillis());
            messageMetadata.setSequenceId(message.getSequenceId());
            if (null != message.getReplicationClusters()) {
                messageMetadata.addAllReplicateTos((Iterable)message.getReplicationClusters());
            }
            if (null != message.getProperties()) {
                messageMetadata.addAllProperties((Iterable)message.getProperties().entrySet().stream().map(entry -> {
                    KeyValue keyValue = new KeyValue();
                    keyValue.setKey((String)entry.getKey());
                    keyValue.setValue((String)entry.getValue());
                    return keyValue;
                }).collect(Collectors.toList()));
            }
            if (null != message.getKey()) {
                if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
                    kvSchema = (KeyValueSchemaImpl)schema;
                    messageMetadata.setPartitionKey(Base64.getEncoder().encodeToString(this.encodeWithSchema(message.getKey(), kvSchema.getKeySchema())));
                    messageMetadata.setPartitionKeyB64Encoded(true);
                } else {
                    messageMetadata.setPartitionKey(message.getKey());
                    messageMetadata.setPartitionKeyB64Encoded(false);
                }
            }
            if (null != message.getEventTime() && !message.getEventTime().isEmpty()) {
                messageMetadata.setEventTime(Long.parseLong(message.getEventTime()));
            }
            if (message.isDisableReplication()) {
                messageMetadata.clearReplicateTo();
                messageMetadata.addReplicateTo("__local__");
            }
            if (message.getDeliverAt() != 0L && messageMetadata.hasEventTime()) {
                messageMetadata.setDeliverAtTime(message.getDeliverAt());
            } else if (message.getDeliverAfterMs() != 0L) {
                messageMetadata.setDeliverAtTime(messageMetadata.getEventTime() + message.getDeliverAfterMs());
            }
            if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
                kvSchema = (KeyValueSchemaImpl)schema;
                pulsarMessages.add((Message)MessageImpl.create((MessageMetadata)messageMetadata, (ByteBuffer)ByteBuffer.wrap(this.encodeWithSchema(message.getPayload(), kvSchema.getValueSchema())), (Schema)schema, (String)topicName.toString()));
                continue;
            }
            pulsarMessages.add((Message)MessageImpl.create((MessageMetadata)messageMetadata, (ByteBuffer)ByteBuffer.wrap(this.encodeWithSchema(message.getPayload(), schema)), (Schema)schema, (String)topicName.toString()));
        }
        return pulsarMessages;
    }

    private byte[] encodeWithSchema(String input, Schema schema) {
        try {
            switch (schema.getSchemaInfo().getType()) {
                case INT8: {
                    return schema.encode((Object)Byte.parseByte(input));
                }
                case INT16: {
                    return schema.encode((Object)Short.parseShort(input));
                }
                case INT32: {
                    return schema.encode((Object)Integer.parseInt(input));
                }
                case INT64: {
                    return schema.encode((Object)Long.parseLong(input));
                }
                case STRING: {
                    return schema.encode((Object)input);
                }
                case FLOAT: {
                    return schema.encode((Object)Float.valueOf(Float.parseFloat(input)));
                }
                case DOUBLE: {
                    return schema.encode((Object)Double.parseDouble(input));
                }
                case BOOLEAN: {
                    return schema.encode((Object)Boolean.parseBoolean(input));
                }
                case BYTES: {
                    return schema.encode((Object)input.getBytes());
                }
                case DATE: {
                    return schema.encode((Object)DateFormat.getDateInstance().parse(input));
                }
                case TIME: {
                    return schema.encode((Object)new Time(Long.parseLong(input)));
                }
                case TIMESTAMP: {
                    return schema.encode((Object)new Timestamp(Long.parseLong(input)));
                }
                case INSTANT: {
                    return schema.encode((Object)Instant.parse(input));
                }
                case LOCAL_DATE: {
                    return schema.encode((Object)LocalDate.parse(input));
                }
                case LOCAL_TIME: {
                    return schema.encode((Object)LocalTime.parse(input));
                }
                case LOCAL_DATE_TIME: {
                    return schema.encode((Object)LocalDateTime.parse(input));
                }
                case JSON: {
                    GenericJsonWriter jsonWriter = new GenericJsonWriter();
                    return jsonWriter.write((org.apache.pulsar.client.api.schema.GenericRecord)new GenericJsonRecord(null, null, ObjectMapperFactory.getMapper().reader().readTree(input), schema.getSchemaInfo()));
                }
                case AVRO: {
                    AvroBaseStructSchema avroSchema = (AvroBaseStructSchema)schema;
                    JsonDecoder decoder = DecoderFactory.get().jsonDecoder(avroSchema.getAvroSchema(), input);
                    GenericDatumReader reader = new GenericDatumReader(avroSchema.getAvroSchema());
                    GenericRecord genericRecord = (GenericRecord)reader.read(null, (Decoder)decoder);
                    GenericAvroWriter avroWriter = new GenericAvroWriter(avroSchema.getAvroSchema());
                    return avroWriter.write((org.apache.pulsar.client.api.schema.GenericRecord)new GenericAvroRecord(null, avroSchema.getAvroSchema(), null, genericRecord));
                }
            }
            throw new PulsarClientException.InvalidMessageException("");
        }
        catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("Fail to encode value {} with schema {} for rest produce request", (Object)input, (Object)new String(schema.getSchemaInfo().getSchema()));
            }
            return new byte[0];
        }
    }

    private synchronized void completeLookup(Pair<List<String>, Boolean> result, List<String> redirectAddresses, CompletableFuture<Void> future) {
        this.pulsar().getBrokerService().getLookupRequestSemaphore().release();
        if (!((List)result.getLeft()).isEmpty()) {
            if (((Boolean)result.getRight()).booleanValue()) {
                redirectAddresses.add(0, this.isRequestHttps() ? (String)((List)result.getLeft()).get(1) : (String)((List)result.getLeft()).get(0));
            } else {
                redirectAddresses.add(redirectAddresses.size(), this.isRequestHttps() ? (String)((List)result.getLeft()).get(1) : (String)((List)result.getLeft()).get(0));
            }
        }
        future.complete(null);
    }

    public void validateProducePermission() throws Exception {
        if (this.pulsar().getConfiguration().isAuthenticationEnabled() && this.pulsar().getBrokerService().isAuthorizationEnabled()) {
            boolean isAuthorized;
            if (!TopicsBase.isClientAuthenticated(this.clientAppId())) {
                throw new RestException(Response.Status.UNAUTHORIZED, "Need to authenticate to perform the request");
            }
            AuthenticationParameters authParams = this.authParams();
            try {
                isAuthorized = (Boolean)this.pulsar().getBrokerService().getAuthorizationService().allowTopicOperationAsync(this.topicName, TopicOperation.PRODUCE, authParams).get(this.config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
            }
            catch (TimeoutException e) {
                log.warn("Time-out {} sec while checking authorization on {} ", (Object)this.config().getMetadataStoreOperationTimeoutSeconds(), (Object)this.topicName);
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Time-out while checking authorization");
            }
            catch (Exception e) {
                log.warn("Producer-client  with Role - {} {} failed to get permissions for topic - {}. {}", new Object[]{authParams.getClientRole(), authParams.getOriginalPrincipal(), this.topicName, e.getMessage()});
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Failed to get permissions");
            }
            if (!isAuthorized) {
                throw new RestException(Response.Status.UNAUTHORIZED, "Unauthorized to produce to topic " + String.valueOf(this.topicName));
            }
        }
    }
}

