package org.kairosdb.datastore.cassandra;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import me.prettyprint.cassandra.model.HColumnImpl;
import me.prettyprint.cassandra.model.MutatorImpl;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.mutation.Mutator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kairosdb/datastore/cassandra/WriteBuffer.class */
public class WriteBuffer<RowKeyType, ColumnKeyType, ValueType> implements Runnable {
    public static final Logger logger = LoggerFactory.getLogger(WriteBuffer.class);
    private Keyspace m_keyspace;
    private String m_cfName;
    private Mutator<RowKeyType> m_mutator;
    private ReentrantLock m_mutatorLock;
    private Condition m_lockCondition;
    private int m_writeDelay;
    private Serializer<RowKeyType> m_rowKeySerializer;
    private Serializer<ColumnKeyType> m_columnKeySerializer;
    private Serializer<ValueType> m_valueSerializer;
    private WriteBufferStats m_writeStats;
    private int m_maxBufferSize;
    private int m_initialMaxBufferSize;
    private volatile int m_bufferCount = 0;
    private boolean m_exit = false;
    private Thread m_writeThread = new Thread(this);

    public WriteBuffer(Keyspace keyspace, String str, int i, int i2, Serializer<RowKeyType> serializer, Serializer<ColumnKeyType> serializer2, Serializer<ValueType> serializer3, WriteBufferStats writeBufferStats, ReentrantLock reentrantLock, Condition condition) {
        this.m_keyspace = keyspace;
        this.m_cfName = str;
        this.m_writeDelay = i;
        this.m_maxBufferSize = i2;
        this.m_initialMaxBufferSize = i2;
        this.m_rowKeySerializer = serializer;
        this.m_columnKeySerializer = serializer2;
        this.m_valueSerializer = serializer3;
        this.m_writeStats = writeBufferStats;
        this.m_mutatorLock = reentrantLock;
        this.m_lockCondition = condition;
        this.m_mutator = new MutatorImpl(keyspace, serializer);
        this.m_writeThread.start();
    }

    public void addData(RowKeyType rowkeytype, ColumnKeyType columnkeytype, ValueType valuetype, long j) {
        addData(rowkeytype, columnkeytype, valuetype, j, 0);
    }

    public void addData(RowKeyType rowkeytype, ColumnKeyType columnkeytype, ValueType valuetype, long j, int i) {
        this.m_mutatorLock.lock();
        try {
            waitOnBufferFull();
            this.m_bufferCount++;
            if (columnkeytype.toString().length() > 0) {
                HColumnImpl hColumnImpl = new HColumnImpl(columnkeytype, valuetype, j, this.m_columnKeySerializer, this.m_valueSerializer);
                if (i != 0) {
                    hColumnImpl.setTtl(i);
                }
                this.m_mutator.addInsertion(rowkeytype, this.m_cfName, hColumnImpl);
            } else {
                logger.info("Discarded " + this.m_cfName + " row with empty column name. This should never happen.");
            }
        } finally {
            this.m_mutatorLock.unlock();
        }
    }

    public void deleteRow(RowKeyType rowkeytype, long j) {
        this.m_mutatorLock.lock();
        try {
            waitOnBufferFull();
            this.m_bufferCount++;
            this.m_mutator.addDeletion(rowkeytype, this.m_cfName, j);
            this.m_mutatorLock.unlock();
        } catch (Throwable th) {
            this.m_mutatorLock.unlock();
            throw th;
        }
    }

    public void deleteColumn(RowKeyType rowkeytype, ColumnKeyType columnkeytype, long j) {
        this.m_mutatorLock.lock();
        try {
            waitOnBufferFull();
            this.m_bufferCount++;
            this.m_mutator.addDeletion(rowkeytype, this.m_cfName, columnkeytype, this.m_columnKeySerializer, j);
            this.m_mutatorLock.unlock();
        } catch (Throwable th) {
            this.m_mutatorLock.unlock();
            throw th;
        }
    }

    private void waitOnBufferFull() {
        if (this.m_bufferCount <= this.m_maxBufferSize || this.m_mutatorLock.getHoldCount() != 1) {
            return;
        }
        try {
            this.m_lockCondition.await();
        } catch (InterruptedException e) {
        }
    }

    public void close() throws InterruptedException {
        this.m_exit = true;
        this.m_writeThread.interrupt();
        this.m_writeThread.join();
    }

    public void increaseMaxBufferSize() {
        if (this.m_maxBufferSize < this.m_initialMaxBufferSize) {
            this.m_maxBufferSize += 1000;
            logger.info("Increasing write buffer " + this.m_cfName + " size to " + this.m_maxBufferSize);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.m_exit) {
            try {
                Thread.sleep(this.m_writeDelay);
            } catch (InterruptedException e) {
            }
            Mutator<RowKeyType> mutator = null;
            if (this.m_bufferCount != 0) {
                this.m_mutatorLock.lock();
                try {
                    this.m_writeStats.saveWriteSize(this.m_bufferCount);
                    mutator = this.m_mutator;
                    this.m_mutator = new MutatorImpl(this.m_keyspace, this.m_rowKeySerializer);
                    this.m_bufferCount = 0;
                    this.m_lockCondition.signalAll();
                    this.m_mutatorLock.unlock();
                } catch (Throwable th) {
                    this.m_mutatorLock.unlock();
                    throw th;
                }
            }
            if (mutator != null) {
                try {
                    mutator.execute();
                } catch (Exception e2) {
                    logger.error("Error sending data to Cassandra (" + this.m_cfName + ")", e2);
                    this.m_maxBufferSize = (this.m_maxBufferSize * 3) / 4;
                    logger.error("Reducing write buffer size to " + this.m_maxBufferSize + ".  You need to increase your cassandra capacity or change the kairosdb.datastore.cassandra.write_buffer_max_size property.");
                }
            }
            mutator = null;
            while (mutator != null) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e3) {
                }
                try {
                    mutator.execute();
                    mutator = null;
                } catch (Exception e4) {
                    logger.error("Error resending data", e4);
                }
            }
        }
    }
}
