/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.log.remote.metadata.storage;

import java.io.Closeable;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataTopicPartitioner;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerManager
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ProducerManager.class);
    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
    private final KafkaProducer<byte[], byte[]> producer;
    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
    private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;

    public ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig, RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner) {
        this.rlmmConfig = rlmmConfig;
        this.producer = new KafkaProducer(rlmmConfig.producerProperties());
        this.topicPartitioner = rlmmTopicPartitioner;
    }

    public CompletableFuture<RecordMetadata> publishMessage(RemoteLogMetadata remoteLogMetadata) {
        CompletableFuture<RecordMetadata> future = new CompletableFuture<RecordMetadata>();
        TopicIdPartition topicIdPartition = remoteLogMetadata.topicIdPartition();
        int metadataPartitionNum = this.topicPartitioner.metadataPartition(topicIdPartition);
        log.debug("Publishing metadata message of partition:[{}] into metadata topic partition:[{}] with payload: [{}]", new Object[]{topicIdPartition, metadataPartitionNum, remoteLogMetadata});
        if (metadataPartitionNum >= this.rlmmConfig.metadataTopicPartitionsCount()) {
            throw new KafkaException("Chosen partition no " + metadataPartitionNum + " must be less than the partition count: " + this.rlmmConfig.metadataTopicPartitionsCount());
        }
        try {
            Callback callback = (metadata, exception) -> {
                if (exception != null) {
                    future.completeExceptionally(exception);
                } else {
                    future.complete(metadata);
                }
            };
            this.producer.send(new ProducerRecord(this.rlmmConfig.remoteLogMetadataTopicName(), Integer.valueOf(metadataPartitionNum), null, (Object)this.serde.serialize(remoteLogMetadata)), callback);
        }
        catch (Exception ex) {
            future.completeExceptionally(ex);
        }
        return future;
    }

    @Override
    public void close() {
        try {
            this.producer.close(Duration.ofSeconds(30L));
        }
        catch (Exception e) {
            log.error("Error encountered while closing the producer", (Throwable)e);
        }
    }
}

