package org.kairosdb.datastore.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.base.Preconditions;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import javax.inject.Named;
import org.kairosdb.core.DataPoint;
import org.kairosdb.core.DataPointSet;
import org.kairosdb.core.KairosDataPointFactory;
import org.kairosdb.core.datapoints.DataPointFactory;
import org.kairosdb.core.datapoints.LegacyDataPointFactory;
import org.kairosdb.core.datapoints.LegacyDoubleDataPoint;
import org.kairosdb.core.datapoints.LegacyLongDataPoint;
import org.kairosdb.core.datastore.DataPointRow;
import org.kairosdb.core.datastore.Datastore;
import org.kairosdb.core.datastore.DatastoreMetricQuery;
import org.kairosdb.core.datastore.Order;
import org.kairosdb.core.datastore.QueryCallback;
import org.kairosdb.core.datastore.QueryMetric;
import org.kairosdb.core.datastore.QueryPlugin;
import org.kairosdb.core.datastore.ServiceKeyStore;
import org.kairosdb.core.datastore.TagSet;
import org.kairosdb.core.datastore.TagSetImpl;
import org.kairosdb.core.exception.DatastoreException;
import org.kairosdb.core.queue.EventCompletionCallBack;
import org.kairosdb.core.queue.ProcessorHandler;
import org.kairosdb.core.queue.QueueProcessor;
import org.kairosdb.core.reporting.KairosMetricReporter;
import org.kairosdb.core.reporting.ThreadReporter;
import org.kairosdb.datastore.cassandra.CassandraModule;
import org.kairosdb.eventbus.Subscribe;
import org.kairosdb.events.DataPointEvent;
import org.kairosdb.util.IngestExecutorService;
import org.kairosdb.util.KDataInput;
import org.kairosdb.util.MemoryMonitor;
import org.kairosdb.util.SimpleStatsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kairosdb/datastore/cassandra/CassandraDatastore.class */
public class CassandraDatastore implements Datastore, ProcessorHandler, KairosMetricReporter, ServiceKeyStore {
    public static final int LONG_FLAG = 0;
    public static final int FLOAT_FLAG = 1;
    public static final long ROW_WIDTH = 1814400000;
    public static final String KEY_QUERY_TIME = "kairosdb.datastore.cassandra.key_query_time";
    public static final String ROW_KEY_COUNT = "kairosdb.datastore.cassandra.row_key_count";
    public static final String RAW_ROW_KEY_COUNT = "kairosdb.datastore.cassandra.raw_row_key_count";
    public static final String ROW_KEY_METRIC_NAMES = "metric_names";
    public static final String ROW_KEY_TAG_NAMES = "tag_names";
    public static final String ROW_KEY_TAG_VALUES = "tag_values";
    private final CassandraClient m_cassandraClient;
    private final Schema m_schema;
    private Session m_session;
    private final KairosDataPointFactory m_kairosDataPointFactory;
    private final QueueProcessor m_queueProcessor;
    private final IngestExecutorService m_congestionExecutor;
    private final CassandraModule.BatchHandlerFactory m_batchHandlerFactory;
    private final CassandraModule.DeleteBatchHandlerFactory m_deleteBatchHandlerFactory;
    private CassandraConfiguration m_cassandraConfiguration;

    @Inject
    @Named(QueueProcessor.BATCH_SIZE)
    private int m_batchSize;
    public static final Logger logger = LoggerFactory.getLogger(CassandraDatastore.class);
    public static final DataPointsRowKeySerializer DATA_POINTS_ROW_KEY_SERIALIZER = new DataPointsRowKeySerializer();
    private static final Charset UTF_8 = Charset.forName("UTF-8");
    private static final IDontCareCallBack s_dontCareCallBack = new IDontCareCallBack();

    @Inject
    private final BatchStats m_batchStats = new BatchStats();

    @Inject
    private DataCache<DataPointsRowKey> m_rowKeyCache = new DataCache<>(1024);

    @Inject
    private DataCache<String> m_metricNameCache = new DataCache<>(1024);

    @Inject
    private SimpleStatsReporter m_simpleStatsReporter = new SimpleStatsReporter();

    @Inject
    @Named(CassandraConfiguration.KEYSPACE_PROPERTY)
    private String m_keyspace = "kairosdb";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/kairosdb/datastore/cassandra/CassandraDatastore$CQLFilteredRowKeyIterator.class */
    public class CQLFilteredRowKeyIterator implements Iterator<DataPointsRowKey> {
        private final SetMultimap<String, String> m_filterTags;
        private DataPointsRowKey m_nextKey;
        private final Iterator<ResultSet> m_resultSets;
        private ResultSet m_currentResultSet;
        private final String m_metricName;
        private int m_rawRowKeyCount = 0;
        private Set<DataPointsRowKey> m_returnedKeys;

        public CQLFilteredRowKeyIterator(String str, long j, long j2, SetMultimap<String, String> setMultimap) throws DatastoreException {
            this.m_filterTags = setMultimap;
            this.m_metricName = str;
            ArrayList arrayList = new ArrayList();
            this.m_returnedKeys = new HashSet();
            long currentTimeMillis = System.currentTimeMillis();
            if (j >= 0 || j2 < 0) {
                BoundStatement boundStatement = new BoundStatement(CassandraDatastore.this.m_schema.psRowKeyIndexQuery);
                boundStatement.setBytesUnsafe(0, CassandraDatastore.serializeString(str));
                setStartEndKeys(boundStatement, str, j, j2);
                boundStatement.setConsistencyLevel(CassandraDatastore.this.m_cassandraConfiguration.getDataReadLevel());
                arrayList.add(CassandraDatastore.this.m_session.executeAsync(boundStatement));
            } else {
                BoundStatement boundStatement2 = new BoundStatement(CassandraDatastore.this.m_schema.psRowKeyIndexQuery);
                boundStatement2.setBytesUnsafe(0, CassandraDatastore.serializeString(str));
                setStartEndKeys(boundStatement2, str, j, -1L);
                boundStatement2.setConsistencyLevel(CassandraDatastore.this.m_cassandraConfiguration.getDataReadLevel());
                arrayList.add(CassandraDatastore.this.m_session.executeAsync(boundStatement2));
                BoundStatement boundStatement3 = new BoundStatement(CassandraDatastore.this.m_schema.psRowKeyIndexQuery);
                boundStatement3.setBytesUnsafe(0, CassandraDatastore.serializeString(str));
                setStartEndKeys(boundStatement3, str, 0L, j2);
                boundStatement3.setConsistencyLevel(CassandraDatastore.this.m_cassandraConfiguration.getDataReadLevel());
                arrayList.add(CassandraDatastore.this.m_session.executeAsync(boundStatement3));
            }
            for (Long l : createQueryKeyList(str, j, j2)) {
                BoundStatement boundStatement4 = new BoundStatement(CassandraDatastore.this.m_schema.psRowKeyQuery);
                boundStatement4.setString(0, str);
                boundStatement4.setTimestamp(1, new Date(l.longValue()));
                boundStatement4.setConsistencyLevel(CassandraDatastore.this.m_cassandraConfiguration.getDataReadLevel());
                arrayList.add(CassandraDatastore.this.m_session.executeAsync(boundStatement4));
            }
            try {
                this.m_resultSets = ((List) Futures.allAsList(arrayList).get()).iterator();
                if (this.m_resultSets.hasNext()) {
                    this.m_currentResultSet = this.m_resultSets.next();
                }
                ThreadReporter.addDataPoint(CassandraDatastore.KEY_QUERY_TIME, System.currentTimeMillis() - currentTimeMillis);
            } catch (InterruptedException e) {
                throw new DatastoreException("Index query interrupted", e);
            } catch (ExecutionException e2) {
                throw new DatastoreException("Failed to read key index", e2);
            }
        }

        private DataPointsRowKey nextKeyFromIterator(ResultSet resultSet) {
            DataPointsRowKey dataPointsRowKey = null;
            boolean z = false;
            if (resultSet.getColumnDefinitions().contains("row_time")) {
                z = true;
            }
            while (true) {
                if (!resultSet.isExhausted()) {
                    Row one = resultSet.one();
                    this.m_rawRowKeyCount++;
                    DataPointsRowKey dataPointsRowKey2 = z ? new DataPointsRowKey(this.m_metricName, one.getTimestamp(0).getTime(), one.getString(1), new TreeMap(one.getMap(2, String.class, String.class))) : CassandraDatastore.DATA_POINTS_ROW_KEY_SERIALIZER.fromByteBuffer(one.getBytes(0));
                    SortedMap<String, String> tags = dataPointsRowKey2.getTags();
                    Iterator it = this.m_filterTags.keySet().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            if (!this.m_returnedKeys.contains(dataPointsRowKey2)) {
                                this.m_returnedKeys.add(dataPointsRowKey2);
                                dataPointsRowKey = dataPointsRowKey2;
                                break;
                            }
                        } else {
                            String str = (String) it.next();
                            String str2 = tags.get(str);
                            if (str2 != null && this.m_filterTags.get(str).contains(str2)) {
                            }
                        }
                    }
                } else {
                    break;
                }
            }
            return dataPointsRowKey;
        }

        private List<Long> createQueryKeyList(String str, long j, long j2) {
            ArrayList arrayList = new ArrayList();
            BoundStatement boundStatement = new BoundStatement(CassandraDatastore.this.m_schema.psRowKeyTimeQuery);
            boundStatement.setString(0, str);
            boundStatement.setTimestamp(1, new Date(CassandraDatastore.calculateRowTime(j)));
            boundStatement.setTimestamp(2, new Date(j2));
            boundStatement.setConsistencyLevel(CassandraDatastore.this.m_cassandraConfiguration.getDataReadLevel());
            ResultSet execute = CassandraDatastore.this.m_session.execute(boundStatement);
            while (!execute.isExhausted()) {
                arrayList.add(Long.valueOf(execute.one().getTimestamp(0).getTime()));
            }
            return arrayList;
        }

        private void setStartEndKeys(BoundStatement boundStatement, String str, long j, long j2) {
            DataPointsRowKey dataPointsRowKey = new DataPointsRowKey(str, CassandraDatastore.calculateRowTime(j), "");
            DataPointsRowKey dataPointsRowKey2 = new DataPointsRowKey(str, CassandraDatastore.calculateRowTime(j2), "");
            dataPointsRowKey2.setEndSearchKey(true);
            boundStatement.setBytesUnsafe(1, CassandraDatastore.DATA_POINTS_ROW_KEY_SERIALIZER.toByteBuffer(dataPointsRowKey));
            boundStatement.setBytesUnsafe(2, CassandraDatastore.DATA_POINTS_ROW_KEY_SERIALIZER.toByteBuffer(dataPointsRowKey2));
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            this.m_nextKey = null;
            while (this.m_currentResultSet != null && (!this.m_currentResultSet.isExhausted() || this.m_resultSets.hasNext())) {
                this.m_nextKey = nextKeyFromIterator(this.m_currentResultSet);
                if (this.m_nextKey != null) {
                    break;
                }
                if (this.m_resultSets.hasNext()) {
                    this.m_currentResultSet = this.m_resultSets.next();
                }
            }
            if (this.m_nextKey == null) {
                ThreadReporter.addDataPoint(CassandraDatastore.RAW_ROW_KEY_COUNT, this.m_rawRowKeyCount);
            }
            return this.m_nextKey != null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public DataPointsRowKey next() {
            return this.m_nextKey;
        }

        @Override // java.util.Iterator
        public void remove() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/kairosdb/datastore/cassandra/CassandraDatastore$DeletingCallback.class */
    public class DeletingCallback implements QueryCallback {
        private String m_metricName;

        /* loaded from: input_file:org/kairosdb/datastore/cassandra/CassandraDatastore$DeletingCallback$DeleteDatePointWriter.class */
        private class DeleteDatePointWriter implements QueryCallback.DataPointWriter {
            private String m_dataType;
            private SortedMap<String, String> m_tags;
            private List<DataPoint> m_dataPoints = new ArrayList();

            public DeleteDatePointWriter(String str, SortedMap<String, String> sortedMap) {
                this.m_dataType = str;
                this.m_tags = sortedMap;
            }

            @Override // org.kairosdb.core.datastore.QueryCallback.DataPointWriter
            public void addDataPoint(DataPoint dataPoint) throws IOException {
                this.m_dataPoints.add(dataPoint);
                if (this.m_dataPoints.size() > CassandraDatastore.this.m_batchSize) {
                    List<DataPoint> list = this.m_dataPoints;
                    this.m_dataPoints = new ArrayList();
                    CassandraDatastore.this.m_congestionExecutor.submit(CassandraDatastore.this.m_deleteBatchHandlerFactory.create(DeletingCallback.this.m_metricName, this.m_tags, list, CassandraDatastore.s_dontCareCallBack));
                }
            }

            @Override // org.kairosdb.core.datastore.QueryCallback.DataPointWriter, java.lang.AutoCloseable
            public void close() throws IOException {
                if (this.m_dataPoints.size() != 0) {
                    CassandraDatastore.this.m_congestionExecutor.submit(CassandraDatastore.this.m_deleteBatchHandlerFactory.create(DeletingCallback.this.m_metricName, this.m_tags, this.m_dataPoints, CassandraDatastore.s_dontCareCallBack));
                }
            }
        }

        public DeletingCallback(String str) {
            this.m_metricName = str;
        }

        @Override // org.kairosdb.core.datastore.QueryCallback
        public QueryCallback.DataPointWriter startDataPointSet(String str, SortedMap<String, String> sortedMap) throws IOException {
            return new DeleteDatePointWriter(str, sortedMap);
        }
    }

    /* loaded from: input_file:org/kairosdb/datastore/cassandra/CassandraDatastore$IDontCareCallBack.class */
    private static class IDontCareCallBack implements EventCompletionCallBack {
        private IDontCareCallBack() {
        }

        @Override // org.kairosdb.core.queue.EventCompletionCallBack
        public void complete() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/kairosdb/datastore/cassandra/CassandraDatastore$QueryListener.class */
    public class QueryListener implements FutureCallback<ResultSet> {
        private final DataPointsRowKey m_rowKey;
        private final QueryCallback m_callback;
        private final Semaphore m_semaphore;
        private final QueryMonitor m_queryMonitor;

        public QueryListener(DataPointsRowKey dataPointsRowKey, QueryCallback queryCallback, Semaphore semaphore, QueryMonitor queryMonitor) {
            this.m_rowKey = dataPointsRowKey;
            this.m_callback = queryCallback;
            this.m_semaphore = semaphore;
            this.m_queryMonitor = queryMonitor;
        }

        public void onSuccess(@Nullable ResultSet resultSet) {
            try {
                try {
                    if (resultSet.isExhausted()) {
                        this.m_semaphore.release();
                        return;
                    }
                    QueryCallback.DataPointWriter startDataPointSet = this.m_callback.startDataPointSet(this.m_rowKey.getDataType(), this.m_rowKey.getTags());
                    Throwable th = null;
                    try {
                        try {
                            DataPointFactory factoryForDataStoreType = CassandraDatastore.this.m_kairosDataPointFactory.getFactoryForDataStoreType(this.m_rowKey.getDataType());
                            while (!resultSet.isExhausted()) {
                                Row one = resultSet.one();
                                int i = one.getBytes(0).getInt();
                                ByteBuffer bytes = one.getBytes(1);
                                long columnTimestamp = CassandraDatastore.getColumnTimestamp(this.m_rowKey.getTimestamp(), i);
                                if (this.m_rowKey.getDataType() != LegacyDataPointFactory.DATASTORE_TYPE) {
                                    startDataPointSet.addDataPoint(factoryForDataStoreType.getDataPoint(columnTimestamp, KDataInput.createInput(bytes)));
                                } else if (CassandraDatastore.isLongValue(i)) {
                                    startDataPointSet.addDataPoint(new LegacyLongDataPoint(columnTimestamp, ValueSerializer.getLongFromByteBuffer(bytes)));
                                } else {
                                    startDataPointSet.addDataPoint(new LegacyDoubleDataPoint(columnTimestamp, ValueSerializer.getDoubleFromByteBuffer(bytes)));
                                }
                                this.m_queryMonitor.incrementCounter();
                            }
                            if (startDataPointSet != null) {
                                if (0 != 0) {
                                    try {
                                        startDataPointSet.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    startDataPointSet.close();
                                }
                            }
                            this.m_semaphore.release();
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    } catch (Throwable th4) {
                        if (startDataPointSet != null) {
                            if (th != null) {
                                try {
                                    startDataPointSet.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                startDataPointSet.close();
                            }
                        }
                        throw th4;
                    }
                } catch (Exception e) {
                    CassandraDatastore.logger.error("QueryListener failure", e);
                    this.m_queryMonitor.failQuery(e);
                    this.m_semaphore.release();
                }
            } catch (Throwable th6) {
                this.m_semaphore.release();
                throw th6;
            }
        }

        public void onFailure(Throwable th) {
            CassandraDatastore.logger.error("Async query failure", th);
            this.m_queryMonitor.failQuery(th);
            this.m_semaphore.release();
        }
    }

    @Inject
    public CassandraDatastore(CassandraClient cassandraClient, CassandraConfiguration cassandraConfiguration, Schema schema, Session session, KairosDataPointFactory kairosDataPointFactory, QueueProcessor queueProcessor, IngestExecutorService ingestExecutorService, CassandraModule.BatchHandlerFactory batchHandlerFactory, CassandraModule.DeleteBatchHandlerFactory deleteBatchHandlerFactory) throws DatastoreException {
        this.m_cassandraClient = cassandraClient;
        this.m_kairosDataPointFactory = kairosDataPointFactory;
        this.m_queueProcessor = queueProcessor;
        this.m_congestionExecutor = ingestExecutorService;
        this.m_batchHandlerFactory = batchHandlerFactory;
        this.m_deleteBatchHandlerFactory = deleteBatchHandlerFactory;
        this.m_schema = schema;
        this.m_session = session;
        this.m_cassandraConfiguration = cassandraConfiguration;
        this.m_queueProcessor.setProcessorHandler(this);
    }

    private static ByteBuffer serializeEndString(String str) {
        byte[] bytes = str.getBytes(UTF_8);
        int length = bytes.length - 1;
        bytes[length] = (byte) (bytes[length] + 1);
        return ByteBuffer.wrap(bytes);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ByteBuffer serializeString(String str) {
        return ByteBuffer.wrap(str.getBytes(UTF_8));
    }

    public void cleanRowKeyCache() {
        long calculateRowTime = calculateRowTime(System.currentTimeMillis());
        for (DataPointsRowKey dataPointsRowKey : this.m_rowKeyCache.getCachedKeys()) {
            if (dataPointsRowKey.getTimestamp() != calculateRowTime) {
                this.m_rowKeyCache.removeKey(dataPointsRowKey);
            }
        }
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public void close() throws InterruptedException {
        this.m_queueProcessor.shutdown();
        this.m_session.close();
        this.m_cassandraClient.close();
    }

    @Subscribe
    public void putDataPoint(DataPointEvent dataPointEvent) throws DatastoreException {
        this.m_queueProcessor.put(dataPointEvent);
    }

    @Override // org.kairosdb.core.queue.ProcessorHandler
    public void handleEvents(List<DataPointEvent> list, EventCompletionCallBack eventCompletionCallBack, boolean z) {
        this.m_congestionExecutor.submit(this.m_batchHandlerFactory.create(list, eventCompletionCallBack, z));
    }

    private Iterable<String> queryStringIndex(String str, String str2) {
        BoundStatement boundStatement = new BoundStatement(this.m_schema.psStringIndexPrefixQuery);
        boundStatement.setBytesUnsafe(0, serializeString(str));
        boundStatement.setBytesUnsafe(1, serializeString(str2));
        boundStatement.setBytesUnsafe(2, serializeEndString(str2));
        boundStatement.setConsistencyLevel(this.m_cassandraConfiguration.getDataReadLevel());
        ResultSet execute = this.m_session.execute(boundStatement);
        ArrayList arrayList = new ArrayList();
        while (!execute.isExhausted()) {
            arrayList.add(execute.one().getString(0));
        }
        return arrayList;
    }

    private Iterable<String> queryStringIndex(String str) {
        BoundStatement boundStatement = new BoundStatement(this.m_schema.psStringIndexQuery);
        boundStatement.setBytesUnsafe(0, serializeString(str));
        boundStatement.setConsistencyLevel(this.m_cassandraConfiguration.getDataReadLevel());
        ResultSet execute = this.m_session.execute(boundStatement);
        ArrayList arrayList = new ArrayList();
        while (!execute.isExhausted()) {
            arrayList.add(execute.one().getString(0));
        }
        return arrayList;
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public Iterable<String> getMetricNames(String str) {
        return str == null ? queryStringIndex("metric_names") : queryStringIndex("metric_names", str);
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public Iterable<String> getTagNames() {
        return queryStringIndex("tag_names");
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public Iterable<String> getTagValues() {
        return queryStringIndex("tag_values");
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public TagSet queryMetricTags(DatastoreMetricQuery datastoreMetricQuery) throws DatastoreException {
        TagSetImpl tagSetImpl = new TagSetImpl();
        Iterator<DataPointsRowKey> keysForQueryIterator = getKeysForQueryIterator(datastoreMetricQuery);
        MemoryMonitor memoryMonitor = new MemoryMonitor(20);
        while (keysForQueryIterator.hasNext()) {
            for (Map.Entry<String, String> entry : keysForQueryIterator.next().getTags().entrySet()) {
                tagSetImpl.addTag(entry.getKey(), entry.getValue());
                memoryMonitor.checkMemoryAndThrowException();
            }
        }
        return tagSetImpl;
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public void setValue(String str, String str2, String str3, String str4) throws DatastoreException {
        BoundStatement boundStatement = new BoundStatement(this.m_schema.psServiceIndexInsert);
        boundStatement.setString(0, str);
        boundStatement.setString(1, str2);
        boundStatement.setString(2, str3);
        boundStatement.setString(3, str4);
        this.m_session.execute(boundStatement);
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public String getValue(String str, String str2, String str3) throws DatastoreException {
        BoundStatement boundStatement = new BoundStatement(this.m_schema.psServiceIndexGet);
        boundStatement.setString(0, str);
        boundStatement.setString(1, str2);
        boundStatement.setString(2, str3);
        Row one = this.m_session.execute(boundStatement).one();
        String str4 = null;
        if (one != null) {
            str4 = one.getString(0);
        }
        return str4;
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public Iterable<String> listServiceKeys(String str) throws DatastoreException {
        ArrayList arrayList = new ArrayList();
        if (this.m_schema.psServiceIndexListServiceKeys == null) {
            throw new DatastoreException("List Service Keys is not available on this version of Cassandra.");
        }
        BoundStatement boundStatement = new BoundStatement(this.m_schema.psServiceIndexListServiceKeys);
        boundStatement.setString(0, str);
        ResultSet execute = this.m_session.execute(boundStatement);
        while (!execute.isExhausted()) {
            arrayList.add(execute.one().getString(0));
        }
        return arrayList;
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public Iterable<String> listKeys(String str, String str2) throws DatastoreException {
        ArrayList arrayList = new ArrayList();
        BoundStatement boundStatement = new BoundStatement(this.m_schema.psServiceIndexListKeys);
        boundStatement.setString(0, str);
        boundStatement.setString(1, str2);
        ResultSet execute = this.m_session.execute(boundStatement);
        while (!execute.isExhausted()) {
            arrayList.add(execute.one().getString(0));
        }
        return arrayList;
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public Iterable<String> listKeys(String str, String str2, String str3) throws DatastoreException {
        ArrayList arrayList = new ArrayList();
        BoundStatement boundStatement = new BoundStatement(this.m_schema.psServiceIndexListKeysPrefix);
        boundStatement.setString(0, str);
        boundStatement.setString(1, str2);
        boundStatement.setString(2, str3);
        boundStatement.setString(3, str3 + (char) 65535);
        ResultSet execute = this.m_session.execute(boundStatement);
        while (!execute.isExhausted()) {
            arrayList.add(execute.one().getString(0));
        }
        return arrayList;
    }

    @Override // org.kairosdb.core.datastore.ServiceKeyStore
    public void deleteKey(String str, String str2, String str3) throws DatastoreException {
        BoundStatement boundStatement = new BoundStatement(this.m_schema.psServiceIndexDeleteKey);
        boundStatement.setString(0, str);
        boundStatement.setString(1, str2);
        boundStatement.setString(2, str3);
        this.m_session.execute(boundStatement);
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public void queryDatabase(DatastoreMetricQuery datastoreMetricQuery, QueryCallback queryCallback) throws DatastoreException {
        cqlQueryWithRowKeys(datastoreMetricQuery, queryCallback, getKeysForQueryIterator(datastoreMetricQuery));
    }

    @Override // org.kairosdb.core.reporting.KairosMetricReporter
    public List<DataPointSet> getMetrics(long j) {
        ArrayList arrayList = new ArrayList();
        this.m_simpleStatsReporter.reportStats(this.m_batchStats.getNameStats(), j, "kairosdb.datastore.cassandra.write_batch_size", "table", "string_index", arrayList);
        this.m_simpleStatsReporter.reportStats(this.m_batchStats.getDataPointStats(), j, "kairosdb.datastore.cassandra.write_batch_size", "table", "data_points", arrayList);
        this.m_simpleStatsReporter.reportStats(this.m_batchStats.getRowKeyStats(), j, "kairosdb.datastore.cassandra.write_batch_size", "table", "row_keys", arrayList);
        return arrayList;
    }

    private void cqlQueryWithRowKeys(final DatastoreMetricQuery datastoreMetricQuery, QueryCallback queryCallback, Iterator<DataPointsRowKey> it) throws DatastoreException {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        long startTime = datastoreMetricQuery.getStartTime();
        long endTime = datastoreMetricQuery.getEndTime();
        boolean z = datastoreMetricQuery.getLimit() != 0;
        QueryMonitor queryMonitor = new QueryMonitor(this.m_cassandraConfiguration.getQueryLimit());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.m_cassandraConfiguration.getQueryReaderThreads(), new ThreadFactory() { // from class: org.kairosdb.datastore.cassandra.CassandraDatastore.1
            private int m_count = 0;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                this.m_count++;
                return new Thread(runnable, "query_" + datastoreMetricQuery.getName() + "-" + this.m_count);
            }
        });
        Semaphore semaphore = new Semaphore(this.m_cassandraConfiguration.getSimultaneousQueries());
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            i++;
            DataPointsRowKey next = it.next();
            long timestamp = next.getTimestamp();
            int columnName = startTime < timestamp ? 0 : getColumnName(timestamp, startTime);
            int columnName2 = endTime > timestamp + ROW_WIDTH ? getColumnName(timestamp, timestamp + ROW_WIDTH) + 1 : getColumnName(timestamp, endTime) + 1;
            ByteBuffer allocate = ByteBuffer.allocate(4);
            allocate.putInt(columnName);
            allocate.rewind();
            ByteBuffer allocate2 = ByteBuffer.allocate(4);
            allocate2.putInt(columnName2);
            allocate2.rewind();
            BoundStatement boundStatement = z ? datastoreMetricQuery.getOrder() == Order.ASC ? new BoundStatement(this.m_schema.psDataPointsQueryAscLimit) : new BoundStatement(this.m_schema.psDataPointsQueryDescLimit) : datastoreMetricQuery.getOrder() == Order.ASC ? new BoundStatement(this.m_schema.psDataPointsQueryAsc) : new BoundStatement(this.m_schema.psDataPointsQueryDesc);
            boundStatement.setBytesUnsafe(0, DATA_POINTS_ROW_KEY_SERIALIZER.toByteBuffer(next));
            boundStatement.setBytesUnsafe(1, allocate);
            boundStatement.setBytesUnsafe(2, allocate2);
            if (z) {
                boundStatement.setInt(3, datastoreMetricQuery.getLimit());
            }
            boundStatement.setConsistencyLevel(this.m_cassandraConfiguration.getDataReadLevel());
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (queryMonitor.keepRunning()) {
                ResultSetFuture executeAsync = this.m_session.executeAsync(boundStatement);
                arrayList.add(executeAsync);
                Futures.addCallback(executeAsync, new QueryListener(next, queryCallback, semaphore, queryMonitor), newFixedThreadPool);
            } else {
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    ((ResultSetFuture) it2.next()).cancel(true);
                }
            }
        }
        ThreadReporter.addDataPoint(ROW_KEY_COUNT, i);
        try {
            if (queryMonitor.getException() == null) {
                semaphore.acquire(this.m_cassandraConfiguration.getSimultaneousQueries());
            }
            newFixedThreadPool.shutdown();
        } catch (InterruptedException e2) {
            logger.error("Query interrupted", e2);
        }
        if (queryMonitor.getException() != null) {
            throw new DatastoreException(queryMonitor.getException());
        }
    }

    private void deletePartialRow(DataPointsRowKey dataPointsRowKey, long j, long j2) throws DatastoreException {
        if (this.m_schema.psDataPointsDeleteRange == null) {
            QueryMetric queryMetric = new QueryMetric(j, j2, 0, dataPointsRowKey.getMetricName());
            cqlQueryWithRowKeys(queryMetric, new DeletingCallback(queryMetric.getName()), Collections.singletonList(dataPointsRowKey).iterator());
            return;
        }
        BoundStatement boundStatement = new BoundStatement(this.m_schema.psDataPointsDeleteRange);
        boundStatement.setBytesUnsafe(0, DATA_POINTS_ROW_KEY_SERIALIZER.toByteBuffer(dataPointsRowKey));
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(getColumnName(dataPointsRowKey.getTimestamp(), j));
        allocate.rewind();
        boundStatement.setBytesUnsafe(1, allocate);
        ByteBuffer allocate2 = ByteBuffer.allocate(4);
        allocate2.putInt(getColumnName(dataPointsRowKey.getTimestamp(), j2));
        allocate2.rewind();
        boundStatement.setBytesUnsafe(2, allocate2);
        boundStatement.setConsistencyLevel(this.m_cassandraConfiguration.getDataReadLevel());
        this.m_session.executeAsync(boundStatement);
    }

    @Override // org.kairosdb.core.datastore.Datastore
    public void deleteDataPoints(DatastoreMetricQuery datastoreMetricQuery) throws DatastoreException {
        Preconditions.checkNotNull(datastoreMetricQuery);
        boolean z = false;
        boolean z2 = false;
        if (datastoreMetricQuery.getStartTime() == Long.MIN_VALUE && datastoreMetricQuery.getEndTime() == Long.MAX_VALUE) {
            z2 = true;
        }
        Iterator<DataPointsRowKey> keysForQueryIterator = getKeysForQueryIterator(datastoreMetricQuery);
        new ArrayList();
        while (keysForQueryIterator.hasNext()) {
            DataPointsRowKey next = keysForQueryIterator.next();
            long timestamp = next.getTimestamp();
            if (datastoreMetricQuery.getStartTime() <= timestamp && datastoreMetricQuery.getEndTime() >= (timestamp + ROW_WIDTH) - 1) {
                BoundStatement boundStatement = new BoundStatement(this.m_schema.psDataPointsDeleteRow);
                boundStatement.setBytesUnsafe(0, DATA_POINTS_ROW_KEY_SERIALIZER.toByteBuffer(next));
                boundStatement.setConsistencyLevel(this.m_cassandraConfiguration.getDataReadLevel());
                this.m_session.execute(boundStatement);
                BoundStatement boundStatement2 = new BoundStatement(this.m_schema.psRowKeyIndexDelete);
                boundStatement2.setBytesUnsafe(0, serializeString(next.getMetricName()));
                boundStatement2.setBytesUnsafe(1, DATA_POINTS_ROW_KEY_SERIALIZER.toByteBuffer(next));
                boundStatement2.setConsistencyLevel(this.m_cassandraConfiguration.getDataReadLevel());
                this.m_session.execute(boundStatement2);
                BoundStatement boundStatement3 = new BoundStatement(this.m_schema.psRowKeyDelete);
                boundStatement3.setString(0, next.getMetricName());
                boundStatement3.setTimestamp(1, new Date(next.getTimestamp()));
                boundStatement3.setConsistencyLevel(this.m_cassandraConfiguration.getDataReadLevel());
                this.m_session.execute(boundStatement3);
                BoundStatement boundStatement4 = new BoundStatement(this.m_schema.psRowKeyTimeDelete);
                boundStatement4.setString(0, next.getMetricName());
                boundStatement4.setTimestamp(1, new Date(next.getTimestamp()));
                boundStatement4.setConsistencyLevel(this.m_cassandraConfiguration.getDataReadLevel());
                this.m_session.execute(boundStatement4);
                z = true;
            } else if (datastoreMetricQuery.getStartTime() <= timestamp) {
                deletePartialRow(next, timestamp, datastoreMetricQuery.getEndTime());
            } else if (datastoreMetricQuery.getEndTime() >= (timestamp + ROW_WIDTH) - 1) {
                deletePartialRow(next, datastoreMetricQuery.getStartTime(), (timestamp + ROW_WIDTH) - 1);
            } else {
                deletePartialRow(next, datastoreMetricQuery.getStartTime(), datastoreMetricQuery.getEndTime());
            }
        }
        if (z2) {
            BoundStatement boundStatement5 = new BoundStatement(this.m_schema.psRowKeyIndexDeleteRow);
            boundStatement5.setBytesUnsafe(0, serializeString(datastoreMetricQuery.getName()));
            boundStatement5.setConsistencyLevel(this.m_cassandraConfiguration.getDataReadLevel());
            this.m_session.executeAsync(boundStatement5);
            BoundStatement boundStatement6 = new BoundStatement(this.m_schema.psStringIndexDelete);
            boundStatement6.setBytesUnsafe(0, serializeString("metric_names"));
            boundStatement6.setBytesUnsafe(1, serializeString(datastoreMetricQuery.getName()));
            boundStatement6.setConsistencyLevel(this.m_cassandraConfiguration.getDataReadLevel());
            this.m_session.executeAsync(boundStatement6);
            z = true;
            this.m_metricNameCache.clear();
        }
        if (z) {
            this.m_rowKeyCache.clear();
        }
    }

    private SortedMap<String, String> getTags(DataPointRow dataPointRow) {
        TreeMap treeMap = new TreeMap();
        for (String str : dataPointRow.getTagNames()) {
            treeMap.put(str, dataPointRow.getTagValue(str));
        }
        return treeMap;
    }

    public Iterator<DataPointsRowKey> getKeysForQueryIterator(DatastoreMetricQuery datastoreMetricQuery) throws DatastoreException {
        Iterator<DataPointsRowKey> it = null;
        Iterator<QueryPlugin> it2 = datastoreMetricQuery.getPlugins().iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            QueryPlugin next = it2.next();
            if (next instanceof CassandraRowKeyPlugin) {
                it = ((CassandraRowKeyPlugin) next).getKeysForQueryIterator(datastoreMetricQuery);
                break;
            }
        }
        if (it == null) {
            it = new CQLFilteredRowKeyIterator(datastoreMetricQuery.getName(), datastoreMetricQuery.getStartTime(), datastoreMetricQuery.getEndTime(), datastoreMetricQuery.getTags());
        }
        return it;
    }

    public static long calculateRowTime(long j) {
        return j - (Math.abs(j) % ROW_WIDTH);
    }

    private static int getColumnName(long j, long j2, boolean z) {
        int i = (int) (j2 - j);
        return z ? (i << 1) | 0 : (i << 1) | 1;
    }

    public static int getColumnName(long j, long j2) {
        return ((int) (j2 - j)) << 1;
    }

    public static long getColumnTimestamp(long j, int i) {
        return j + (i >>> 1);
    }

    public static boolean isLongValue(int i) {
        return (i & 1) == 0;
    }

    private void printHosts(Iterator<Host> it) {
        StringBuilder sb = new StringBuilder();
        while (it.hasNext()) {
            sb.append(it.next().toString()).append(" ");
        }
        System.out.println(sb.toString());
    }
}
