package com.metamx.tranquility.kafka.writer;

import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import com.metamx.common.scala.net.curator.Disco;
import com.metamx.tranquility.config.DataSourceConfig;
import com.metamx.tranquility.finagle.FinagleRegistry;
import com.metamx.tranquility.finagle.FinagleRegistryConfig;
import com.metamx.tranquility.kafka.model.MessageCounters;
import com.metamx.tranquility.kafka.model.PropertiesBasedKafkaConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/* loaded from: input_file:com/metamx/tranquility/kafka/writer/WriterController.class */
public class WriterController {
    private static final Logger log = new Logger(WriterController.class);
    private static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 500, 30000);
    private List<DataSourceConfig<PropertiesBasedKafkaConfig>> dataSourceConfigList;
    private Map<String, TranquilityEventWriter> writers = new ConcurrentHashMap();
    private Map<String, CuratorFramework> curators = new ConcurrentHashMap();
    private Map<String, FinagleRegistry> finagleRegistries = new ConcurrentHashMap();

    public WriterController(Map<String, DataSourceConfig<PropertiesBasedKafkaConfig>> map) {
        this.dataSourceConfigList = new ArrayList(map.values());
        Collections.sort(this.dataSourceConfigList, new Comparator<DataSourceConfig<PropertiesBasedKafkaConfig>>() { // from class: com.metamx.tranquility.kafka.writer.WriterController.1
            @Override // java.util.Comparator
            public int compare(DataSourceConfig<PropertiesBasedKafkaConfig> dataSourceConfig, DataSourceConfig<PropertiesBasedKafkaConfig> dataSourceConfig2) {
                return ((PropertiesBasedKafkaConfig) dataSourceConfig2.propertiesBasedConfig()).getTopicPatternPriority().compareTo(((PropertiesBasedKafkaConfig) dataSourceConfig.propertiesBasedConfig()).getTopicPatternPriority());
            }
        });
        log.info("Ready: [topicPattern] -> dataSource mappings:", new Object[0]);
        for (DataSourceConfig<PropertiesBasedKafkaConfig> dataSourceConfig : this.dataSourceConfigList) {
            log.info("  [%s] -> %s (priority: %d)", new Object[]{((PropertiesBasedKafkaConfig) dataSourceConfig.propertiesBasedConfig()).getTopicPattern(), dataSourceConfig.dataSource(), ((PropertiesBasedKafkaConfig) dataSourceConfig.propertiesBasedConfig()).getTopicPatternPriority()});
        }
    }

    public synchronized TranquilityEventWriter getWriter(String str) {
        if (this.writers.containsKey(str)) {
            return this.writers.get(str);
        }
        for (DataSourceConfig<PropertiesBasedKafkaConfig> dataSourceConfig : this.dataSourceConfigList) {
            if (Pattern.matches(((PropertiesBasedKafkaConfig) dataSourceConfig.propertiesBasedConfig()).getTopicPattern(), str)) {
                log.info("Creating EventWriter for topic [%s] using dataSource [%s]", new Object[]{str, dataSourceConfig.dataSource()});
                this.writers.put(str, createWriter(str, dataSourceConfig));
                return this.writers.get(str);
            }
        }
        throw new RuntimeException(String.format("Kafka topicFilter allowed topic [%s] but no spec is mapped", str));
    }

    public Map<String, MessageCounters> flushAll() throws InterruptedException {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, TranquilityEventWriter> entry : this.writers.entrySet()) {
            entry.getValue().flush();
            hashMap.put(entry.getKey(), entry.getValue().getMessageCounters());
        }
        return hashMap;
    }

    public void stop() {
        Iterator<Map.Entry<String, TranquilityEventWriter>> it = this.writers.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().stop();
        }
        Iterator<Map.Entry<String, CuratorFramework>> it2 = this.curators.entrySet().iterator();
        while (it2.hasNext()) {
            it2.next().getValue().close();
        }
    }

    protected TranquilityEventWriter createWriter(String str, DataSourceConfig<PropertiesBasedKafkaConfig> dataSourceConfig) {
        String zookeeperConnect = ((PropertiesBasedKafkaConfig) dataSourceConfig.propertiesBasedConfig()).zookeeperConnect();
        if (!this.curators.containsKey(zookeeperConnect)) {
            CuratorFramework build = CuratorFrameworkFactory.builder().connectString(((PropertiesBasedKafkaConfig) dataSourceConfig.propertiesBasedConfig()).zookeeperConnect()).connectionTimeoutMs(Ints.checkedCast(((PropertiesBasedKafkaConfig) dataSourceConfig.propertiesBasedConfig()).zookeeperTimeout().toStandardDuration().getMillis())).retryPolicy(RETRY_POLICY).build();
            build.start();
            this.curators.put(zookeeperConnect, build);
        }
        String format = String.format("%s:%s", zookeeperConnect, ((PropertiesBasedKafkaConfig) dataSourceConfig.propertiesBasedConfig()).discoPath());
        if (!this.finagleRegistries.containsKey(format)) {
            this.finagleRegistries.put(format, new FinagleRegistry(FinagleRegistryConfig.builder().build(), new Disco(this.curators.get(zookeeperConnect), dataSourceConfig.propertiesBasedConfig())));
        }
        return new TranquilityEventWriter(str, dataSourceConfig, this.curators.get(zookeeperConnect), this.finagleRegistries.get(format));
    }
}
