package com.metamx.tranquility.kafka;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import com.metamx.tranquility.config.DataSourceConfig;
import com.metamx.tranquility.kafka.model.MessageCounters;
import com.metamx.tranquility.kafka.model.PropertiesBasedKafkaConfig;
import com.metamx.tranquility.kafka.writer.WriterController;
import io.druid.concurrent.Execs;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/* loaded from: input_file:com/metamx/tranquility/kafka/KafkaConsumer.class */
public class KafkaConsumer {
    private static final Logger log = new Logger(KafkaConsumer.class);
    private final ExecutorService consumerExec;
    private final Thread commitThread;
    private final ConsumerConnector consumerConnector;
    private final TopicFilter topicFilter;
    private final int numThreads;
    private final int commitMillis;
    private final WriterController writerController;
    private final AtomicBoolean shutdown = new AtomicBoolean();
    private final ReentrantReadWriteLock commitLock = new ReentrantReadWriteLock();
    private Map<String, MessageCounters> previousMessageCounters = new HashMap();

    public KafkaConsumer(PropertiesBasedKafkaConfig propertiesBasedKafkaConfig, Properties properties, Map<String, DataSourceConfig<PropertiesBasedKafkaConfig>> map, WriterController writerController) {
        this.consumerConnector = getConsumerConnector(properties);
        this.topicFilter = new Whitelist(buildTopicFilter(map));
        log.info("Kafka topic filter [%s]", new Object[]{this.topicFilter});
        this.numThreads = propertiesBasedKafkaConfig.getConsumerNumThreads().intValue() > 0 ? propertiesBasedKafkaConfig.getConsumerNumThreads().intValue() : Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
        this.commitMillis = propertiesBasedKafkaConfig.getCommitPeriodMillis().intValue();
        this.writerController = writerController;
        this.consumerExec = Execs.multiThreaded(this.numThreads, "KafkaConsumer-%d");
        this.commitThread = new Thread(createCommitRunnable());
        this.commitThread.setName("KafkaConsumer-CommitThread");
        this.commitThread.setDaemon(true);
    }

    public void start() {
        this.commitThread.start();
        startConsumers();
    }

    public void stop() {
        if (this.shutdown.compareAndSet(false, true)) {
            log.info("Shutting down - attempting to flush buffers and commit final offsets", new Object[0]);
            try {
                this.commitLock.writeLock().lockInterruptibly();
                try {
                    this.writerController.flushAll();
                    this.writerController.stop();
                    this.consumerConnector.commitOffsets();
                    this.commitLock.writeLock().unlock();
                    this.consumerConnector.shutdown();
                    this.commitThread.interrupt();
                    this.consumerExec.shutdownNow();
                } catch (Throwable th) {
                    this.commitLock.writeLock().unlock();
                    this.consumerConnector.shutdown();
                    this.commitThread.interrupt();
                    this.consumerExec.shutdownNow();
                    throw th;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                Throwables.propagate(e);
            }
            log.info("Finished clean shutdown.", new Object[0]);
        }
    }

    public void join() throws InterruptedException {
        this.commitThread.join();
    }

    void commit() throws InterruptedException {
        this.commitLock.writeLock().lockInterruptibly();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            Map<String, MessageCounters> flushAll = this.writerController.flushAll();
            long currentTimeMillis2 = System.currentTimeMillis();
            this.consumerConnector.commitOffsets();
            long currentTimeMillis3 = System.currentTimeMillis();
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, MessageCounters> entry : flushAll.entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue().difference(this.previousMessageCounters.get(entry.getKey())));
            }
            this.previousMessageCounters = flushAll;
            Logger logger = log;
            Object[] objArr = new Object[3];
            objArr[0] = hashMap.isEmpty() ? "0" : hashMap;
            objArr[1] = Long.valueOf(currentTimeMillis2 - currentTimeMillis);
            objArr[2] = Long.valueOf(currentTimeMillis3 - currentTimeMillis2);
            logger.info("Flushed %s pending messages in %sms and committed offsets in %sms.", objArr);
            this.commitLock.writeLock().unlock();
        } catch (Throwable th) {
            this.commitLock.writeLock().unlock();
            throw th;
        }
    }

    private Runnable createCommitRunnable() {
        return new Runnable() { // from class: com.metamx.tranquility.kafka.KafkaConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        try {
                            try {
                                Thread.sleep(Math.max(KafkaConsumer.this.commitMillis - (System.currentTimeMillis() - currentTimeMillis), 0L));
                                KafkaConsumer.this.commit();
                                currentTimeMillis = System.currentTimeMillis();
                            } catch (Throwable th) {
                                KafkaConsumer.log.error(th, "Commit thread failed!", new Object[0]);
                                throw Throwables.propagate(th);
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            KafkaConsumer.log.info("Commit thread interrupted.", new Object[0]);
                            KafkaConsumer.this.stop();
                            return;
                        }
                    } catch (Throwable th2) {
                        KafkaConsumer.this.stop();
                        throw th2;
                    }
                }
                KafkaConsumer.this.stop();
            }
        };
    }

    private void startConsumers() {
        for (final KafkaStream kafkaStream : this.consumerConnector.createMessageStreamsByFilter(this.topicFilter, this.numThreads)) {
            this.consumerExec.submit(new Runnable() { // from class: com.metamx.tranquility.kafka.KafkaConsumer.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            try {
                                ConsumerIterator it = kafkaStream.iterator();
                                while (it.hasNext()) {
                                    if (Thread.currentThread().isInterrupted()) {
                                        throw new InterruptedException();
                                    }
                                    KafkaConsumer.this.commitLock.readLock().lockInterruptibly();
                                    try {
                                        MessageAndMetadata messageAndMetadata = (MessageAndMetadata) it.next();
                                        KafkaConsumer.this.writerController.getWriter(messageAndMetadata.topic()).send((byte[]) messageAndMetadata.message());
                                        KafkaConsumer.this.commitLock.readLock().unlock();
                                    } catch (Throwable th) {
                                        KafkaConsumer.this.commitLock.readLock().unlock();
                                        throw th;
                                    }
                                }
                                KafkaConsumer.this.stop();
                            } catch (InterruptedException e) {
                                KafkaConsumer.log.info("Consumer thread interrupted.", new Object[0]);
                                KafkaConsumer.this.stop();
                            }
                        } catch (Throwable th2) {
                            KafkaConsumer.log.error(th2, "Exception: ", new Object[0]);
                            throw Throwables.propagate(th2);
                        }
                    } catch (Throwable th3) {
                        KafkaConsumer.this.stop();
                        throw th3;
                    }
                }
            });
        }
    }

    private static ConsumerConnector getConsumerConnector(Properties properties) {
        properties.setProperty("auto.commit.enable", "false");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        Preconditions.checkState(!consumerConfig.autoCommitEnable(), "autocommit must be off");
        return Consumer.createJavaConsumerConnector(consumerConfig);
    }

    private static String buildTopicFilter(Map<String, DataSourceConfig<PropertiesBasedKafkaConfig>> map) {
        StringBuilder sb = new StringBuilder();
        Iterator<Map.Entry<String, DataSourceConfig<PropertiesBasedKafkaConfig>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            sb.append(String.format("(%s)|", ((PropertiesBasedKafkaConfig) it.next().getValue().propertiesBasedConfig()).getTopicPattern()));
        }
        return sb.length() > 0 ? sb.substring(0, sb.length() - 1) : "";
    }
}
