/*
 * Decompiled with CFR 0.152.
 */
package org.rhq.metrics.impl.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureFallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.joda.time.Duration;
import org.rhq.metrics.core.Counter;
import org.rhq.metrics.core.DataAccess;
import org.rhq.metrics.core.DataType;
import org.rhq.metrics.core.MetricsService;
import org.rhq.metrics.core.MetricsThreadFactory;
import org.rhq.metrics.core.RawMetricMapper;
import org.rhq.metrics.core.RawNumericMetric;
import org.rhq.metrics.core.SchemaManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricsServiceCassandra
implements MetricsService {
    private static final Logger logger = LoggerFactory.getLogger(MetricsServiceCassandra.class);
    public static final String REQUEST_LIMIT = "rhq.metrics.request.limit";
    private static final int RAW_TTL = Duration.standardDays(7L).toStandardSeconds().getSeconds();
    private static final Function<ResultSet, Void> TO_VOID = new Function<ResultSet, Void>(){

        public Void apply(ResultSet resultSet) {
            return null;
        }
    };
    private RateLimiter permits = RateLimiter.create((double)Double.parseDouble(System.getProperty("rhq.metrics.request.limit", "30000")), (long)3L, (TimeUnit)TimeUnit.MINUTES);
    private Optional<Session> session;
    private DataAccess dataAccess;
    private MapQueryResultSet mapQueryResultSet = new MapQueryResultSet();
    private Function<ResultSet, List<Counter>> mapCounters = new Function<ResultSet, List<Counter>>(){

        public List<Counter> apply(ResultSet resultSet) {
            ArrayList<Counter> counters = new ArrayList<Counter>();
            for (Row row : resultSet) {
                counters.add(new Counter(row.getString(0), row.getString(1), row.getLong(2)));
            }
            return counters;
        }
    };
    private ListeningExecutorService metricsTasks = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(4, new MetricsThreadFactory()));
    Set<String> ids = new TreeSet<String>();

    @Override
    public void startUp(Session s) {
        this.session = Optional.absent();
        this.dataAccess = new DataAccess(s);
    }

    @Override
    public void startUp(Map<String, String> params) {
        String tmp = params.get("cqlport");
        int port = 9042;
        try {
            port = Integer.parseInt(tmp);
        }
        catch (NumberFormatException nfe) {
            logger.warn("Invalid context param 'cqlport', not a number. Will use a default of 9042");
        }
        String[] nodes = params.containsKey("nodes") ? params.get("nodes").split(",") : new String[]{"127.0.0.1"};
        Cluster cluster = new Cluster.Builder().addContactPoints(nodes).withPort(port).build();
        this.updateSchemaIfNecessary(cluster);
        String keyspace = params.get("keyspace");
        if (keyspace == null || keyspace.isEmpty()) {
            logger.info("No explicit keyspace given, will default to 'rhq'");
            keyspace = "rhq";
        }
        this.session = Optional.of((Object)cluster.connect(keyspace));
        this.dataAccess = new DataAccess((Session)this.session.get());
    }

    @Override
    public void shutdown() {
        if (this.session.isPresent()) {
            Session s = (Session)this.session.get();
            s.close();
            s.getCluster().close();
        }
    }

    @Override
    public ListenableFuture<Void> addData(RawNumericMetric data) {
        this.permits.acquire();
        ResultSetFuture future = this.dataAccess.insertData(data.getBucket(), data.getId(), data.getTimestamp(), (Map<Integer, Double>)ImmutableMap.of((Object)DataType.RAW.ordinal(), (Object)data.getValue()), RAW_TTL);
        return Futures.transform((ListenableFuture)future, TO_VOID);
    }

    @Override
    public ListenableFuture<Map<RawNumericMetric, Throwable>> addData(Set<RawNumericMetric> data) {
        final HashMap<RawNumericMetric, Throwable> errors = new HashMap<RawNumericMetric, Throwable>();
        ArrayList<ResultSetFuture> futures = new ArrayList<ResultSetFuture>(data.size());
        for (RawNumericMetric metric : data) {
            this.permits.acquire();
            ResultSetFuture future = this.dataAccess.insertData(metric.getBucket(), metric.getId(), metric.getTimestamp(), (Map<Integer, Double>)ImmutableMap.of((Object)DataType.RAW.ordinal(), (Object)metric.getAvg()), RAW_TTL);
            Futures.withFallback((ListenableFuture)future, (FutureFallback)new RawDataFallback(errors, metric));
            futures.add(future);
            this.ids.add(metric.getId());
        }
        ListenableFuture insertsFuture = Futures.successfulAsList(futures);
        return Futures.transform((ListenableFuture)insertsFuture, (Function)new Function<List<ResultSet>, Map<RawNumericMetric, Throwable>>(){

            public Map<RawNumericMetric, Throwable> apply(List<ResultSet> resultSets) {
                return errors;
            }
        });
    }

    @Override
    public ListenableFuture<Void> updateCounter(Counter counter) {
        return Futures.transform((ListenableFuture)this.dataAccess.updateCounter(counter), TO_VOID);
    }

    @Override
    public ListenableFuture<Void> updateCounters(Collection<Counter> counters) {
        ResultSetFuture future = this.dataAccess.updateCounters(counters);
        return Futures.transform((ListenableFuture)future, TO_VOID);
    }

    @Override
    public ListenableFuture<List<Counter>> findCounters(String group) {
        ResultSetFuture future = this.dataAccess.findCounters(group);
        return Futures.transform((ListenableFuture)future, this.mapCounters, (Executor)this.metricsTasks);
    }

    @Override
    public ListenableFuture<List<Counter>> findCounters(String group, List<String> counterNames) {
        ResultSetFuture future = this.dataAccess.findCounters(group, counterNames);
        return Futures.transform((ListenableFuture)future, this.mapCounters, (Executor)this.metricsTasks);
    }

    @Override
    public ListenableFuture<List<RawNumericMetric>> findData(String bucket, String id, long start, long end) {
        ResultSetFuture future = this.dataAccess.findData(bucket, id, start, end);
        return Futures.transform((ListenableFuture)future, (Function)this.mapQueryResultSet, (Executor)this.metricsTasks);
    }

    @Override
    public ListenableFuture<List<RawNumericMetric>> findData(String id, long start, long end) {
        return this.findData("raw", id, start, end);
    }

    @Override
    public boolean idExists(String id) {
        return this.ids.contains(id);
    }

    @Override
    public List<String> listMetrics() {
        return new ArrayList<String>(this.ids);
    }

    private void updateSchemaIfNecessary(Cluster cluster) {
        try (Session session = cluster.connect("system");){
            SchemaManager schemaManager = new SchemaManager(session);
            schemaManager.updateSchema("rhq");
        }
    }

    private class MapQueryResultSet
    implements Function<ResultSet, List<RawNumericMetric>> {
        RawMetricMapper mapper = new RawMetricMapper();

        private MapQueryResultSet() {
        }

        public List<RawNumericMetric> apply(ResultSet resultSet) {
            return this.mapper.map(resultSet);
        }
    }

    private class RawDataFallback
    implements FutureFallback<ResultSet> {
        private Map<RawNumericMetric, Throwable> errors;
        private RawNumericMetric data;

        public RawDataFallback(Map<RawNumericMetric, Throwable> errors, RawNumericMetric data) {
            this.errors = errors;
            this.data = data;
        }

        public ListenableFuture<ResultSet> create(Throwable t) throws Exception {
            this.errors.put(this.data, t);
            return Futures.immediateFailedFuture((Throwable)t);
        }
    }
}

