package com.stratio.deep.cql;

import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.stratio.deep.config.ICassandraDeepJobConfig;
import com.stratio.deep.entity.Cell;
import com.stratio.deep.entity.Cells;
import com.stratio.deep.exception.DeepGenericException;
import com.stratio.deep.exception.DeepIOException;
import com.stratio.deep.exception.DeepInstantiationException;
import com.stratio.deep.utils.Pair;
import com.stratio.deep.utils.Utils;
import java.io.Closeable;
import java.io.IOException;
import java.lang.Thread;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.io.IOUtils;
import org.apache.spark.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/stratio/deep/cql/DeepCqlRecordWriter.class */
public class DeepCqlRecordWriter implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(DeepCqlRecordWriter.class);
    private final Map<Token, RangeClient> clients = new HashMap();
    private final Map<Token, RangeClient> removedClients = new HashMap();
    private AbstractType<?> keyValidator;
    private String[] partitionKeyColumns;
    private final ICassandraDeepJobConfig writeConfig;
    private final IPartitioner partitioner;
    private final InetAddress localhost;

    /* loaded from: input_file:com/stratio/deep/cql/DeepCqlRecordWriter$RangeClient.class */
    private class RangeClient extends Thread implements Closeable {
        private final int batchSize;
        private List<String> batchStatements = new ArrayList();
        private List<Object> bindVariables = new ArrayList();
        private UUID identity = UUID.randomUUID();
        private String cql;

        public synchronized boolean put(String str, List<Object> list) {
            this.batchStatements.add(str);
            this.bindVariables.addAll(list);
            boolean z = this.batchStatements.size() >= this.batchSize;
            if (z) {
                triggerBatch();
            }
            return z;
        }

        private void triggerBatch() {
            if (getState() != Thread.State.NEW) {
                return;
            }
            this.cql = Utils.batchQueryGenerator(this.batchStatements);
            start();
        }

        public RangeClient() {
            this.batchSize = DeepCqlRecordWriter.this.writeConfig.getBatchSize();
            DeepCqlRecordWriter.LOG.debug("Create client " + this);
        }

        @Override // java.lang.Thread
        public String toString() {
            return this.identity.toString();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            DeepCqlRecordWriter.LOG.debug("[" + this + "] Called close on client, state: " + getState());
            triggerBatch();
            try {
                join();
            } catch (InterruptedException e) {
                throw new AssertionError(e);
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            DeepCqlRecordWriter.LOG.debug("[" + this + "] Initializing cassandra client");
            ((Session) CassandraClientProvider.trySessionForLocation(DeepCqlRecordWriter.this.localhost.getHostAddress(), DeepCqlRecordWriter.this.writeConfig, false).left).execute(this.cql, this.bindVariables.toArray(new Object[this.bindVariables.size()]));
        }
    }

    public DeepCqlRecordWriter(TaskContext taskContext, ICassandraDeepJobConfig iCassandraDeepJobConfig) {
        this.writeConfig = iCassandraDeepJobConfig;
        this.partitioner = RangeUtils.getPartitioner(iCassandraDeepJobConfig);
        try {
            this.localhost = InetAddress.getLocalHost();
            init();
        } catch (UnknownHostException e) {
            throw new DeepInstantiationException("Cannot resolve local hostname", e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        LOG.debug("Closing all clients");
        Iterator<RangeClient> it = this.clients.values().iterator();
        while (it.hasNext()) {
            try {
                IOUtils.closeQuietly(it.next());
            } catch (Exception e) {
                LOG.error("exception", e);
            }
        }
        Iterator<RangeClient> it2 = this.removedClients.values().iterator();
        while (it2.hasNext()) {
            try {
                IOUtils.closeQuietly(it2.next());
            } catch (Exception e2) {
                LOG.error("exception", e2);
            }
        }
    }

    private ByteBuffer getPartitionKey(Cells cells) {
        ByteBuffer decomposedCellValue;
        if (this.keyValidator instanceof CompositeType) {
            ByteBuffer[] byteBufferArr = new ByteBuffer[this.partitionKeyColumns.length];
            for (int i = 0; i < cells.size(); i++) {
                Cell cellByIdx = cells.getCellByIdx(i);
                if (cellByIdx.isPartitionKey().booleanValue()) {
                    byteBufferArr[i] = cellByIdx.getDecomposedCellValue();
                }
            }
            decomposedCellValue = CompositeType.build(byteBufferArr);
        } else {
            decomposedCellValue = cells.getCellByIdx(0).getDecomposedCellValue();
        }
        return decomposedCellValue;
    }

    private void init() {
        try {
            retrievePartitionKeyValidator();
        } catch (Exception e) {
            throw new DeepGenericException(e);
        }
    }

    private AbstractType<?> parseType(String str) throws ConfigurationException {
        if (str != null) {
            try {
                if (str.equals("org.apache.cassandra.db.marshal.CounterColumnType")) {
                    return LongType.instance;
                }
            } catch (SyntaxException e) {
                throw new ConfigurationException(e.getMessage(), e);
            }
        }
        return TypeParser.parse(str);
    }

    protected void retrievePartitionKeyValidator() throws ConfigurationException {
        Pair<Session, String> trySessionForLocation = CassandraClientProvider.trySessionForLocation(this.localhost.getHostAddress(), this.writeConfig, false);
        String keyspace = this.writeConfig.getKeyspace();
        String columnFamily = this.writeConfig.getColumnFamily();
        Row rowMetadata = getRowMetadata(trySessionForLocation, keyspace, columnFamily);
        if (rowMetadata == null) {
            throw new DeepIOException(String.format("cannot find metadata for %s.%s", keyspace, columnFamily));
        }
        this.keyValidator = parseType(rowMetadata.getString("key_validator"));
        String string = rowMetadata.getString("key_aliases");
        LOG.debug("partition keys: " + string);
        List fromJsonList = FBUtilities.fromJsonList(string);
        this.partitionKeyColumns = new String[fromJsonList.size()];
        int i = 0;
        Iterator it = fromJsonList.iterator();
        while (it.hasNext()) {
            this.partitionKeyColumns[i] = (String) it.next();
            i++;
        }
        LOG.debug("cluster columns: " + rowMetadata.getString("column_aliases"));
    }

    private static Row getRowMetadata(Pair<Session, String> pair, String str, String str2) {
        return ((Session) pair.left).execute(String.format("SELECT key_validator,key_aliases,column_aliases FROM system.schema_columnfamilies WHERE keyspace_name='%s' and columnfamily_name='%s' ", str, str2)).one();
    }

    public void write(Cells cells, Cells cells2) {
        String updateQueryGenerator = Utils.updateQueryGenerator(cells, cells2, this.writeConfig.getKeyspace(), this.writeConfig.getColumnFamily());
        Token token = this.partitioner.getToken(getPartitionKey(cells));
        ArrayList arrayList = new ArrayList(cells2.getCellValues());
        arrayList.addAll(cells.getCellValues());
        RangeClient rangeClient = this.clients.get(token);
        if (rangeClient == null) {
            rangeClient = new RangeClient();
            this.clients.put(token, rangeClient);
        }
        if (rangeClient.put(updateQueryGenerator, arrayList)) {
            this.removedClients.put(token, this.clients.remove(token));
        }
    }
}
