package org.kairosdb.datastore.cassandra;

import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.UnavailableException;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.UnmodifiableIterator;
import com.google.inject.assistedinject.Assisted;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import org.json.JSONWriter;
import org.kairosdb.core.DataPoint;
import org.kairosdb.core.queue.EventCompletionCallBack;
import org.kairosdb.datastore.cassandra.CassandraModule;
import org.kairosdb.datastore.h2.orm.DataPoint_base;
import org.kairosdb.eventbus.FilterEventBus;
import org.kairosdb.eventbus.Publisher;
import org.kairosdb.events.BatchReductionEvent;
import org.kairosdb.events.DataPointEvent;
import org.kairosdb.events.RowKeyEvent;
import org.kairosdb.util.RetryCallable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kairosdb/datastore/cassandra/BatchHandler.class */
public class BatchHandler extends RetryCallable {
    public static final Logger logger = LoggerFactory.getLogger(BatchHandler.class);
    public static final Logger failedLogger = LoggerFactory.getLogger("failed_logger");
    private final List<DataPointEvent> m_events;
    private final EventCompletionCallBack m_callBack;
    private final int m_defaultTtl;
    private final boolean m_allignDatapointTtl;
    private final boolean m_forceDefaultDatapointTtl;
    private final DataCache<DataPointsRowKey> m_rowKeyCache;
    private final DataCache<TimedString> m_metricNameCache;
    private final CassandraModule.CQLBatchFactory m_cqlBatchFactory;
    private final Publisher<RowKeyEvent> m_rowKeyPublisher;
    private final Publisher<BatchReductionEvent> m_batchReductionPublisher;
    private final String m_clusterName;
    private final RowSpec m_rowSpec;

    @Inject
    public BatchHandler(@Assisted List<DataPointEvent> list, @Assisted EventCompletionCallBack eventCompletionCallBack, CassandraConfiguration cassandraConfiguration, DataCache<DataPointsRowKey> dataCache, DataCache<TimedString> dataCache2, FilterEventBus filterEventBus, CassandraModule.CQLBatchFactory cQLBatchFactory, @Assisted RowSpec rowSpec) {
        this.m_events = list;
        this.m_callBack = eventCompletionCallBack;
        this.m_defaultTtl = cassandraConfiguration.getDatapointTtl();
        this.m_clusterName = cassandraConfiguration.getWriteCluster().getClusterName();
        this.m_allignDatapointTtl = cassandraConfiguration.isAlignDatapointTtlWithTimestamp();
        this.m_forceDefaultDatapointTtl = cassandraConfiguration.isForceDefaultDatapointTtl();
        this.m_rowKeyCache = dataCache;
        this.m_metricNameCache = dataCache2;
        this.m_cqlBatchFactory = cQLBatchFactory;
        this.m_rowSpec = rowSpec;
        this.m_rowKeyPublisher = filterEventBus.createPublisher(RowKeyEvent.class);
        this.m_batchReductionPublisher = filterEventBus.createPublisher(BatchReductionEvent.class);
    }

    private void loadBatch(int i, CQLBatch cQLBatch, Iterator<DataPointEvent> it) throws Exception {
        int i2 = 0;
        while (it.hasNext() && i2 < i) {
            DataPointEvent next = it.next();
            i2++;
            String metricName = next.getMetricName();
            if (metricName.length() == 0) {
                logger.warn("Attempted to add empty metric name to string index. Row looks like: " + next.getDataPoint());
            }
            ImmutableSortedMap<String, String> tags = next.getTags();
            DataPoint dataPoint = next.getDataPoint();
            int ttl = this.m_forceDefaultDatapointTtl ? this.m_defaultTtl : next.getTtl();
            logger.trace("ttl (seconds): {}", Integer.valueOf(ttl));
            long currentTimeMillis = System.currentTimeMillis();
            if (0 == ttl) {
                ttl = this.m_defaultTtl;
            }
            if (this.m_allignDatapointTtl) {
                int timestamp = (int) ((currentTimeMillis - dataPoint.getTimestamp()) / 1000);
                logger.trace("datapointAgeInSeconds: {}", Integer.valueOf(timestamp));
                ttl -= timestamp;
                logger.trace("alligned ttl (seconds): {}", Integer.valueOf(ttl));
                if (ttl <= 0) {
                    logger.warn("alligned ttl for {} with tags {} is negative, so the datapoint is already dead, no need to store it", metricName, tags);
                }
            }
            long calculateRowTime = this.m_rowSpec.calculateRowTime(dataPoint.getTimestamp());
            DataPointsRowKey dataPointsRowKey = new DataPointsRowKey(metricName, this.m_clusterName, calculateRowTime, dataPoint.getDataStoreDataType(), tags);
            if (this.m_rowKeyCache.cacheItem(dataPointsRowKey) == null) {
                int rowWidthInMillis = ttl == 0 ? 0 : ttl + ((int) (this.m_rowSpec.getRowWidthInMillis() / 1000));
                cQLBatch.addRowKey(dataPointsRowKey, rowWidthInMillis);
                String metricName2 = dataPointsRowKey.getMetricName();
                this.m_rowKeyPublisher.post(new RowKeyEvent(metricName2, dataPointsRowKey, rowWidthInMillis));
                TimedString timedString = new TimedString(metricName2, calculateRowTime);
                if (this.m_metricNameCache.cacheItem(timedString) == null) {
                    cQLBatch.addMetricName(timedString);
                    cQLBatch.addTimeIndex(timedString.getString(), dataPointsRowKey.getTimestamp(), rowWidthInMillis);
                }
            }
            cQLBatch.addDataPoint(dataPointsRowKey, this.m_rowSpec.getColumnName(calculateRowTime, dataPoint.getTimestamp()), dataPoint, ttl);
        }
    }

    private void clearCacheOfFailedBatch(CQLBatch cQLBatch) {
        if (cQLBatch != null) {
            Iterator<TimedString> it = cQLBatch.getNewMetrics().iterator();
            while (it.hasNext()) {
                this.m_metricNameCache.removeKey(it.next());
            }
            Iterator<DataPointsRowKey> it2 = cQLBatch.getNewRowKeys().iterator();
            while (it2.hasNext()) {
                this.m_rowKeyCache.removeKey(it2.next());
            }
        }
    }

    @Override // org.kairosdb.util.RetryCallable
    public void retryCall() throws Exception {
        boolean z;
        int size;
        int i = 1;
        do {
            z = false;
            CQLBatch cQLBatch = null;
            size = this.m_events.size() / i;
            try {
                Iterator<DataPointEvent> it = this.m_events.iterator();
                while (it.hasNext()) {
                    cQLBatch = this.m_cqlBatchFactory.create();
                    loadBatch(size, cQLBatch, it);
                    cQLBatch.submitBatch();
                }
            } catch (UnavailableException e) {
                clearCacheOfFailedBatch(cQLBatch);
                logger.error(e.getMessage());
                throw e;
            } catch (NoHostAvailableException e2) {
                clearCacheOfFailedBatch(cQLBatch);
                logger.error(e2.getMessage());
                throw e2;
            } catch (Exception e3) {
                clearCacheOfFailedBatch(cQLBatch);
                if ("Batch too large".equals(e3.getMessage())) {
                    logger.warn("Batch size is too large");
                } else {
                    logger.error("Error sending data points", e3);
                }
                if (size > 10) {
                    z = true;
                    logger.info("Retrying batch with smaller limit");
                } else {
                    logger.error("Failed to send data points", e3);
                    if (failedLogger.isTraceEnabled()) {
                        for (DataPointEvent dataPointEvent : this.m_events) {
                            StringWriter stringWriter = new StringWriter();
                            JSONWriter jSONWriter = new JSONWriter(stringWriter);
                            jSONWriter.object();
                            jSONWriter.key("name").value(dataPointEvent.getMetricName());
                            jSONWriter.key(DataPoint_base.COL_TIMESTAMP).value(dataPointEvent.getDataPoint().getTimestamp());
                            jSONWriter.key("value");
                            dataPointEvent.getDataPoint().writeValueToJson(jSONWriter);
                            jSONWriter.key("tags").object();
                            UnmodifiableIterator it2 = dataPointEvent.getTags().entrySet().iterator();
                            while (it2.hasNext()) {
                                Map.Entry entry = (Map.Entry) it2.next();
                                jSONWriter.key((String) entry.getKey()).value(entry.getValue());
                            }
                            jSONWriter.endObject();
                            jSONWriter.key("ttl").value(dataPointEvent.getTtl());
                            jSONWriter.endObject();
                            failedLogger.trace(stringWriter.toString());
                        }
                    }
                }
            }
            i++;
        } while (z);
        if (size < this.m_events.size()) {
            this.m_batchReductionPublisher.post(new BatchReductionEvent(size));
        }
        this.m_callBack.complete();
    }
}
