/*
 * Decompiled with CFR 0.152.
 */
package com.stratio.deep.cassandra.cql;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.stratio.deep.cassandra.config.CassandraDeepJobConfig;
import com.stratio.deep.cassandra.config.ICassandraDeepJobConfig;
import com.stratio.deep.cassandra.cql.CassandraClientProvider;
import com.stratio.deep.cassandra.cql.RangeUtils;
import com.stratio.deep.cassandra.querybuilder.CassandraUpdateQueryBuilder;
import com.stratio.deep.cassandra.util.CassandraUtils;
import com.stratio.deep.commons.entity.Cells;
import com.stratio.deep.commons.exception.DeepGenericException;
import com.stratio.deep.commons.exception.DeepIOException;
import com.stratio.deep.commons.exception.DeepInstantiationException;
import com.stratio.deep.commons.handler.DeepRecordWriter;
import com.stratio.deep.commons.utils.Pair;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DeepCqlRecordWriter
extends DeepRecordWriter {
    private static final Logger LOG = LoggerFactory.getLogger(DeepCqlRecordWriter.class);
    private final Map<Token, RangeClient> clients = new HashMap<Token, RangeClient>();
    private final Map<Token, RangeClient> removedClients = new HashMap<Token, RangeClient>();
    private AbstractType<?> keyValidator;
    private String[] partitionKeyColumns;
    private final ICassandraDeepJobConfig writeConfig;
    private final IPartitioner partitioner;
    private final InetAddress localhost;
    private final CassandraUpdateQueryBuilder queryBuilder;

    public DeepCqlRecordWriter(ICassandraDeepJobConfig writeConfig, CassandraUpdateQueryBuilder queryBuilder) {
        this.writeConfig = writeConfig;
        this.partitioner = RangeUtils.getPartitioner(writeConfig);
        this.queryBuilder = queryBuilder;
        try {
            this.localhost = InetAddress.getLocalHost();
        }
        catch (UnknownHostException e) {
            throw new DeepInstantiationException("Cannot resolve local hostname", (Throwable)e);
        }
        this.init();
    }

    public void close() {
        LOG.debug("Closing all clients");
        for (RangeClient client : this.clients.values()) {
            try {
                IOUtils.closeQuietly((Closeable)client);
            }
            catch (Exception e) {
                LOG.error("exception", (Throwable)e);
            }
        }
        for (RangeClient client : this.removedClients.values()) {
            try {
                IOUtils.closeQuietly((Closeable)client);
            }
            catch (Exception e) {
                LOG.error("exception", (Throwable)e);
            }
        }
    }

    private void init() {
        try {
            this.retrievePartitionKeyValidator();
        }
        catch (Exception e) {
            throw new DeepGenericException((Throwable)e);
        }
    }

    private AbstractType<?> parseType(String type) throws ConfigurationException {
        try {
            if (type != null && "org.apache.cassandra.db.marshal.CounterColumnType".equals(type)) {
                return LongType.instance;
            }
            return TypeParser.parse((String)type);
        }
        catch (SyntaxException e) {
            throw new ConfigurationException(e.getMessage(), (Throwable)e);
        }
    }

    protected void retrievePartitionKeyValidator() throws ConfigurationException {
        String cfName;
        String keyspace;
        Pair<Session, String> sessionWithHost = CassandraClientProvider.trySessionForLocation(this.localhost.getHostAddress(), (CassandraDeepJobConfig)this.writeConfig, false);
        Row row = DeepCqlRecordWriter.getRowMetadata(sessionWithHost, keyspace = this.writeConfig.getKeyspace(), cfName = this.writeConfig.getColumnFamily());
        if (row == null) {
            throw new DeepIOException(String.format("cannot find metadata for %s.%s", keyspace, cfName));
        }
        String validator = row.getString("key_validator");
        this.keyValidator = this.parseType(validator);
        String keyString = row.getString("key_aliases");
        LOG.debug("partition keys: " + keyString);
        List keys = FBUtilities.fromJsonList((String)keyString);
        this.partitionKeyColumns = new String[keys.size()];
        int i = 0;
        Iterator i$ = keys.iterator();
        while (i$.hasNext()) {
            String key;
            this.partitionKeyColumns[i] = key = (String)i$.next();
            ++i;
        }
        String clusterColumnString = row.getString("column_aliases");
        LOG.debug("cluster columns: " + clusterColumnString);
    }

    private static Row getRowMetadata(Pair<Session, String> sessionWithHost, String keyspace, String cfName) {
        String query = "SELECT key_validator,key_aliases,column_aliases FROM system.schema_columnfamilies WHERE keyspace_name='%s' and columnfamily_name='%s' ";
        String formatted = String.format(query, keyspace, cfName);
        ResultSet resultSet = ((Session)sessionWithHost.left).execute(formatted);
        return resultSet.one();
    }

    public void write(Cells keys, Cells values) {
        String localCql = this.queryBuilder.prepareQuery(keys, values);
        Token range = this.partitioner.getToken(CassandraUtils.getPartitionKey(keys, this.keyValidator, this.partitionKeyColumns.length));
        ArrayList<Object> allValues = new ArrayList<Object>(values.getCellValues());
        allValues.addAll(keys.getCellValues());
        RangeClient client = this.clients.get(range);
        if (client == null) {
            client = new RangeClient();
            this.clients.put(range, client);
        }
        if (client.put(localCql, allValues)) {
            this.removedClients.put(range, this.clients.remove(range));
        }
    }

    protected class RangeClient
    extends Thread
    implements Closeable {
        private final int batchSize;
        private final List<String> batchStatements;
        private final List<Object> bindVariables;
        private final UUID identity;
        private String cql;

        public synchronized boolean put(String stmt, List<Object> values) {
            boolean res;
            this.batchStatements.add(stmt);
            this.bindVariables.addAll(values);
            boolean bl = res = this.batchStatements.size() >= this.batchSize;
            if (res) {
                this.triggerBatch();
            }
            return res;
        }

        private void triggerBatch() {
            if (this.getState() != Thread.State.NEW) {
                return;
            }
            this.cql = DeepCqlRecordWriter.this.queryBuilder.prepareBatchQuery(this.batchStatements);
            this.start();
        }

        public RangeClient() {
            this.batchSize = DeepCqlRecordWriter.this.writeConfig.getBatchSize();
            this.batchStatements = new ArrayList<String>();
            this.bindVariables = new ArrayList<Object>();
            this.identity = UUID.randomUUID();
            LOG.debug("Create client " + this);
        }

        @Override
        public String toString() {
            return this.identity.toString();
        }

        @Override
        public void close() throws IOException {
            LOG.debug("[" + this + "] Called close on client, state: " + (Object)((Object)this.getState()));
            this.triggerBatch();
            try {
                this.join();
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
        }

        @Override
        public void run() {
            LOG.debug("[" + this + "] Initializing cassandra client");
            Pair<Session, String> sessionWithHost = CassandraClientProvider.trySessionForLocation(DeepCqlRecordWriter.this.localhost.getHostAddress(), (CassandraDeepJobConfig)DeepCqlRecordWriter.this.writeConfig, false);
            ((Session)sessionWithHost.left).execute(this.cql, this.bindVariables.toArray(new Object[this.bindVariables.size()]));
        }
    }
}

