/*
 * Decompiled with CFR 0.152.
 */
package com.stratio.deep.cassandra.cql;

import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.stratio.deep.cassandra.config.CassandraDeepJobConfig;
import com.stratio.deep.cassandra.cql.CassandraClientProvider;
import com.stratio.deep.cassandra.entity.CellValidator;
import com.stratio.deep.cassandra.filter.value.EqualsInValue;
import com.stratio.deep.cassandra.util.CassandraUtils;
import com.stratio.deep.commons.config.DeepJobConfig;
import com.stratio.deep.commons.exception.DeepGenericException;
import com.stratio.deep.commons.exception.DeepIOException;
import com.stratio.deep.commons.exception.DeepIllegalAccessException;
import com.stratio.deep.commons.impl.DeepPartitionLocationComparator;
import com.stratio.deep.commons.rdd.DeepTokenRange;
import com.stratio.deep.commons.rdd.IDeepRecordReader;
import com.stratio.deep.commons.utils.Pair;
import com.stratio.deep.commons.utils.Utils;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeepRecordReader
implements IDeepRecordReader {
    private static final Logger LOG = LoggerFactory.getLogger(DeepRecordReader.class);
    private final DeepTokenRange<?, String> split;
    private RowIterator rowIterator;
    private String cfName;
    private final List<BoundColumn> partitionBoundColumns = new ArrayList<BoundColumn>();
    private final List<BoundColumn> clusterColumns = new ArrayList<BoundColumn>();
    private String columns;
    private final int pageSize;
    private IPartitioner<?> partitioner;
    private AbstractType<?> keyValidator;
    private final CassandraDeepJobConfig<?> config;
    private Session session;
    private boolean filterByKey = false;

    public DeepRecordReader(DeepJobConfig<?, ?> config, DeepTokenRange<?, String> split) {
        this.config = (CassandraDeepJobConfig)config;
        this.split = split;
        this.pageSize = ((CassandraDeepJobConfig)config).getPageSize();
        this.initialize();
    }

    private void initialize() {
        this.cfName = this.config.getTable();
        if (!ArrayUtils.isEmpty((Object[])this.config.getInputColumns())) {
            this.columns = StringUtils.join((Object[])this.config.getInputColumns(), (String)",");
        }
        this.partitioner = (IPartitioner)Utils.newTypeInstance((String)this.config.getPartitionerClassName(), IPartitioner.class);
        try {
            this.session = this.createConnection();
            this.retrieveKeys();
        }
        catch (Exception e) {
            throw new DeepIOException((Throwable)e);
        }
        this.rowIterator = new RowIterator();
    }

    private Session createConnection() {
        ArrayList locations = Lists.newArrayList((Iterable)this.split.getReplicas());
        Collections.sort(locations, new DeepPartitionLocationComparator());
        Exception lastException = null;
        LOG.debug("createConnection: " + locations);
        for (String location : locations) {
            try {
                return (Session)CassandraClientProvider.trySessionForLocation((String)location, this.config, (Boolean)Boolean.valueOf((boolean)false)).left;
            }
            catch (Exception e) {
                LOG.error("Could not get connection for: {}, replicas: {}", (Object)location, (Object)locations);
                lastException = e;
            }
        }
        throw new DeepIOException(lastException);
    }

    public void close() {
    }

    public Map<String, ByteBuffer> createEmptyMap() {
        return new LinkedHashMap<String, ByteBuffer>();
    }

    private void retrieveKeys() {
        BoundColumn boundColumn;
        String columnName;
        TableMetadata tableMetadata = this.config.fetchTableMetadata();
        List partitionKeys = tableMetadata.getPartitionKey();
        List clusteringKeys = tableMetadata.getClusteringColumns();
        ArrayList<AbstractType> types = new ArrayList<AbstractType>();
        for (ColumnMetadata key : partitionKeys) {
            columnName = key.getName();
            boundColumn = new BoundColumn(columnName);
            boundColumn.validator = CellValidator.cellValidator(key.getType()).getAbstractType();
            this.partitionBoundColumns.add(boundColumn);
            types.add(boundColumn.validator);
        }
        for (ColumnMetadata key : clusteringKeys) {
            columnName = key.getName();
            boundColumn = new BoundColumn(columnName);
            boundColumn.validator = CellValidator.cellValidator(key.getType()).getAbstractType();
            this.clusterColumns.add(boundColumn);
        }
        if (types.size() > 1) {
            this.keyValidator = CompositeType.getInstance(types);
        } else if (types.size() == 1) {
            this.keyValidator = (AbstractType)types.get(0);
        } else {
            throw new DeepGenericException("Cannot determine if keyvalidator is composed or not, partitionKeys: " + partitionKeys);
        }
    }

    private boolean reachEndRange() {
        ByteBuffer rowKey;
        if (this.keyValidator instanceof CompositeType) {
            ByteBuffer[] keys = new ByteBuffer[this.partitionBoundColumns.size()];
            for (int i = 0; i < this.partitionBoundColumns.size(); ++i) {
                keys[i] = this.partitionBoundColumns.get(i).value.duplicate();
            }
            rowKey = CompositeType.build((ByteBuffer[])keys);
        } else {
            rowKey = this.partitionBoundColumns.get(0).value;
        }
        String endToken = String.valueOf(this.split.getEndToken());
        String currentToken = this.partitioner.getToken(rowKey).toString();
        return endToken.equals(currentToken);
    }

    public boolean hasNext() {
        return this.rowIterator.hasNext();
    }

    public Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> next() {
        if (!this.hasNext()) {
            throw new DeepIllegalAccessException("DeepRecordReader exhausted");
        }
        return (Pair)this.rowIterator.next();
    }

    private ByteBuffer getPartitionKey(List<Pair<String, Serializable>> equalsList, Serializable inValue) {
        assert (equalsList.size() + 1 == ((CompositeType)this.keyValidator).componentsCount());
        ByteBuffer[] serialized = new ByteBuffer[equalsList.size() + 1];
        for (int i = 0; i < equalsList.size(); ++i) {
            ByteBuffer buffer;
            serialized[i] = buffer = ((AbstractType)this.keyValidator.getComponents().get(i)).decompose(equalsList.get((int)i).right);
        }
        serialized[serialized.length - 1] = ((AbstractType)this.keyValidator.getComponents().get(serialized.length - 1)).decompose((Object)inValue);
        return CompositeType.build((ByteBuffer[])serialized);
    }

    private static class BoundColumn
    implements Serializable {
        private final String name;
        private ByteBuffer value;
        private AbstractType<?> validator;

        public BoundColumn(String name) {
            this.name = name;
        }
    }

    class RowIterator
    extends AbstractIterator<Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>>> {
        private Iterator<Row> rows;
        private String partitionKeyString;
        private String partitionKeyMarkers;

        public RowIterator() {
            this.executeQuery();
        }

        private boolean isColumnWanted(String columnName) {
            return ArrayUtils.isEmpty((Object[])DeepRecordReader.this.config.getInputColumns()) || ArrayUtils.contains((Object[])DeepRecordReader.this.config.getInputColumns(), (Object)columnName);
        }

        protected Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> computeNext() {
            if (this.rows == null || !this.rows.hasNext()) {
                return (Pair)this.endOfData();
            }
            Map<String, ByteBuffer> valueColumns = DeepRecordReader.this.createEmptyMap();
            Map<String, ByteBuffer> keyColumns = DeepRecordReader.this.createEmptyMap();
            this.initColumns(valueColumns, keyColumns);
            return Pair.create(keyColumns, valueColumns);
        }

        private void initColumns(Map<String, ByteBuffer> valueColumns, Map<String, ByteBuffer> keyColumns) {
            ByteBuffer bb;
            String columnName;
            Row row = this.rows.next();
            TableMetadata tableMetadata = DeepRecordReader.this.config.fetchTableMetadata();
            List partitionKeys = tableMetadata.getPartitionKey();
            List clusteringKeys = tableMetadata.getClusteringColumns();
            List allColumns = tableMetadata.getColumns();
            for (ColumnMetadata key : partitionKeys) {
                columnName = key.getName();
                bb = row.getBytesUnsafe(columnName);
                keyColumns.put(columnName, bb);
            }
            for (ColumnMetadata key : clusteringKeys) {
                columnName = key.getName();
                bb = row.getBytesUnsafe(columnName);
                keyColumns.put(columnName, bb);
            }
            for (ColumnMetadata key : allColumns) {
                columnName = key.getName();
                if (keyColumns.containsKey(columnName) || !this.isColumnWanted(columnName)) continue;
                bb = row.getBytesUnsafe(columnName);
                valueColumns.put(columnName, bb);
            }
        }

        private String composeQuery() {
            String generatedColumns = DeepRecordReader.this.columns;
            if (generatedColumns == null) {
                generatedColumns = "*";
            } else {
                String partitionKey = this.keyString(DeepRecordReader.this.partitionBoundColumns);
                String clusterKey = this.keyString(DeepRecordReader.this.clusterColumns);
                generatedColumns = (generatedColumns = this.withoutKeyColumns(generatedColumns)) != null ? "," + generatedColumns : "";
                generatedColumns = StringUtils.isEmpty((String)clusterKey) ? partitionKey + generatedColumns : partitionKey + "," + clusterKey + generatedColumns;
            }
            EqualsInValue equalsInValue = DeepRecordReader.this.config.getEqualsInValue();
            String generatedQuery = null;
            if (equalsInValue == null) {
                String whereClause = this.whereClause();
                generatedQuery = String.format("SELECT %s FROM %s%s ALLOW FILTERING", generatedColumns, this.quote(DeepRecordReader.this.cfName), whereClause);
            } else {
                String equalsInClause = this.equalsInWhereClause(equalsInValue);
                generatedQuery = String.format("SELECT %s FROM %s %s", generatedColumns, this.quote(DeepRecordReader.this.cfName), equalsInClause);
            }
            return generatedQuery;
        }

        private Statement prepareStatement() {
            String query = this.composeQuery();
            EqualsInValue equalsInValue = DeepRecordReader.this.config.getEqualsInValue();
            Object[] values = null;
            if (equalsInValue == null) {
                List<Object> bindValues = this.preparedQueryBindValues();
                assert (bindValues != null);
                values = bindValues.toArray(new Object[bindValues.size()]);
                LOG.debug("query: " + query + "; values: " + Arrays.toString(values));
            } else {
                values = new Object[equalsInValue.getEqualsList().size() + 1];
                for (int i = 0; i < equalsInValue.getEqualsList().size(); ++i) {
                    values[i] = equalsInValue.getEqualsList().get((int)i).right;
                }
                values[values.length - 1] = this.filterSplits(equalsInValue);
                if (values[values.length - 1] == null) {
                    return null;
                }
                LOG.debug("query: " + query + "; values: " + Arrays.toString(values));
            }
            SimpleStatement stmt = new SimpleStatement(query, values);
            stmt.setFetchSize(DeepRecordReader.this.pageSize);
            return stmt;
        }

        private List<Serializable> filterSplits(EqualsInValue equalsInValue) {
            ArrayList<Serializable> filteredInValues = new ArrayList<Serializable>();
            for (Serializable value : equalsInValue.getInValues()) {
                Token token = DeepRecordReader.this.partitioner.getToken(DeepRecordReader.this.getPartitionKey(equalsInValue.getEqualsList(), value));
                if (!CassandraUtils.isTokenIncludedInRange(DeepRecordReader.this.split, (Token<Comparable>)token)) continue;
                filteredInValues.add(value);
            }
            if (filteredInValues.isEmpty()) {
                return null;
            }
            return filteredInValues;
        }

        private String getLuceneIndex() {
            String indexName = "";
            TableMetadata tableMetadata = DeepRecordReader.this.config.fetchTableMetadata();
            List columns = tableMetadata.getColumns();
            for (ColumnMetadata column : columns) {
                if (column.getIndex() == null || !column.getIndex().isCustomIndex()) continue;
                indexName = column.getName();
            }
            return indexName;
        }

        private String withoutKeyColumns(String columnString) {
            HashSet<String> keyNames = new HashSet<String>();
            for (BoundColumn column : Iterables.concat((Iterable)DeepRecordReader.this.partitionBoundColumns, (Iterable)DeepRecordReader.this.clusterColumns)) {
                keyNames.add(column.name);
            }
            String[] cols = columnString.split(",");
            String result = null;
            for (String column : cols) {
                String trimmed = column.trim();
                if (keyNames.contains(trimmed)) continue;
                String quoted = this.quote(trimmed);
                result = result == null ? quoted : result + "," + quoted;
            }
            return result;
        }

        private String whereClause() {
            if (this.partitionKeyString == null) {
                this.partitionKeyString = this.keyString(DeepRecordReader.this.partitionBoundColumns);
            }
            if (this.partitionKeyMarkers == null) {
                this.partitionKeyMarkers = this.partitionKeyMarkers();
            }
            DeepRecordReader.this.filterByKey = CassandraUtils.isFilterdByKey(DeepRecordReader.this.config.getFilters(), this.partitionKeyString);
            String filterGenerator = CassandraUtils.additionalFilterGenerator(DeepRecordReader.this.config.getAdditionalFilters(), DeepRecordReader.this.config.getFilters(), this.getLuceneIndex());
            StringBuffer sb = new StringBuffer();
            sb.append(" WHERE ");
            if (DeepRecordReader.this.filterByKey) {
                filterGenerator = filterGenerator.substring(4);
            } else {
                sb.append(String.format(" token(%s) > ? AND token(%s) <= ?", this.partitionKeyString, this.partitionKeyString));
            }
            sb.append(filterGenerator);
            return sb.toString();
        }

        private String equalsInWhereClause(EqualsInValue equalsInValue) {
            StringBuffer sb = new StringBuffer();
            sb.append("WHERE ");
            for (int i = 0; i < equalsInValue.getEqualsList().size(); ++i) {
                sb.append((String)equalsInValue.getEqualsList().get((int)i).left).append(" = ? AND ");
            }
            sb.append(equalsInValue.getInField()).append(" IN ?");
            return sb.toString();
        }

        private String keyString(List<BoundColumn> columns) {
            String result = null;
            for (BoundColumn column : columns) {
                result = result == null ? this.quote(column.name) : result + "," + this.quote(column.name);
            }
            return result == null ? "" : result;
        }

        private String partitionKeyMarkers() {
            String result = null;
            for (BoundColumn partitionBoundColumn : DeepRecordReader.this.partitionBoundColumns) {
                result = result == null ? "?" : result + ",?";
            }
            return result;
        }

        private List<Object> preparedQueryBindValues() {
            LinkedList<Object> values = new LinkedList<Object>();
            if (!DeepRecordReader.this.filterByKey) {
                Object startToken = DeepRecordReader.this.split.getStartToken();
                Object endToken = DeepRecordReader.this.split.getEndToken();
                values.add(startToken);
                values.add(endToken);
            }
            return values;
        }

        private String quote(String identifier) {
            return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
        }

        private void executeQuery() {
            Statement stmt = this.prepareStatement();
            if (stmt != null) {
                this.rows = null;
                NoHostAvailableException exception = null;
                for (int retries = 0; retries < 3; ++retries) {
                    try {
                        ResultSet resultSet = DeepRecordReader.this.session.execute(stmt);
                        if (resultSet != null) {
                            this.rows = resultSet.iterator();
                        }
                        return;
                    }
                    catch (NoHostAvailableException e) {
                        LOG.error("Could not connect to ");
                        exception = e;
                        try {
                            Thread.sleep(100L);
                        }
                        catch (InterruptedException e1) {
                            LOG.error("sleep exception", (Throwable)e1);
                        }
                        continue;
                    }
                    catch (Exception e) {
                        throw new DeepIOException((Throwable)e);
                    }
                }
                if (exception != null) {
                    throw new DeepIOException(exception);
                }
            }
        }
    }
}

