/*
 * Decompiled with CFR 0.152.
 */
package com.stratio.streaming.commons.kafka.service;

import com.stratio.streaming.commons.kafka.serializer.ZkStringSerializer;
import com.stratio.streaming.commons.kafka.service.TopicService;
import java.util.Arrays;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.common.Topic;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Map;
import scala.collection.Seq;

public class KafkaTopicService
implements TopicService {
    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicService.class);
    private final ZkClient zkClient;
    private final SimpleConsumer simpleConsumer;
    private static final int CONSUMER_TIMEOUT = 100000;
    private static final int CONSUMER_BUFFER_SIZE = 65536;
    private static final String CONSUMER_CLIENT_ID = "leaderLookup";

    public KafkaTopicService(String zokeeperCluster, String broker, int brokerPort, int connectionTimeout, int sessionTimeout) {
        this.zkClient = new ZkClient(zokeeperCluster, sessionTimeout, connectionTimeout, (ZkSerializer)new ZkStringSerializer());
        this.simpleConsumer = new SimpleConsumer(broker, brokerPort, 100000, 65536, CONSUMER_CLIENT_ID);
    }

    @Override
    public void createTopicIfNotExist(String topic, int replicationFactor, int partitions) {
        if (!AdminUtils.topicExists((ZkClient)this.zkClient, (String)topic)) {
            this.createOrUpdateTopic(topic, replicationFactor, partitions);
        } else {
            logger.info("Topic {} already exists", (Object)topic);
        }
    }

    @Override
    public void createOrUpdateTopic(String topic, int replicationFactor, int partitions) {
        logger.debug("Creating topic {} with replication {} and {} partitions", new Object[]{topic, replicationFactor, partitions});
        Topic.validate((String)topic);
        Seq brokerList = ZkUtils.getSortedBrokerList((ZkClient)this.zkClient);
        Map partitionReplicaAssignment = AdminUtils.assignReplicasToBrokers((Seq)brokerList, (int)partitions, (int)replicationFactor, (int)AdminUtils.assignReplicasToBrokers$default$4(), (int)AdminUtils.assignReplicasToBrokers$default$5());
        AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK((ZkClient)this.zkClient, (String)topic, (Map)partitionReplicaAssignment, (Properties)AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK$default$4(), (boolean)AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK$default$5());
        logger.debug("Topic {} created", (Object)topic);
    }

    @Override
    public Integer getNumPartitionsForTopic(String topic) {
        TopicMetadataRequest topicRequest = new TopicMetadataRequest(Arrays.asList(topic));
        TopicMetadataResponse topicResponse = this.simpleConsumer.send(topicRequest);
        for (TopicMetadata topicMetadata : topicResponse.topicsMetadata()) {
            if (!topic.equals(topicMetadata.topic())) continue;
            int partitionSize = topicMetadata.partitionsMetadata().size();
            logger.debug("Partition size found ({}) for {} topic", (Object)partitionSize, (Object)topic);
            return partitionSize;
        }
        logger.warn("Metadata info not found!. TOPIC {}", (Object)topic);
        return null;
    }
}

