package com.alibaba.jstorm.transactional;

import backtype.storm.generated.StormTopology;
import backtype.storm.serialization.KryoTupleDeserializer;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import com.alibaba.jstorm.cache.RocksDBCache;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.rocksdb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.org.apache.commons.io.FileUtils;

/* loaded from: input_file:com/alibaba/jstorm/transactional/BatchCache.class */
public class BatchCache extends RocksDBCache {
    public static Logger LOG = LoggerFactory.getLogger(BatchCache.class);
    protected Map stormConf;
    protected String workerDir;
    protected String cacheDir;
    protected int taskId;
    protected boolean isExactlyOnceMode;
    protected int maxFlushSize;
    protected KryoTupleSerializer serializer;
    protected KryoTupleDeserializer deserializer;
    protected int pendingBatchGroupIndex = 0;
    protected Map<Integer, Map<Long, PendingBatch>> pendingBatches = new HashMap();
    protected List<Integer> pendingBatchGroups = new ArrayList();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/alibaba/jstorm/transactional/BatchCache$PendingBatch.class */
    public class PendingBatch {
        public String cacheKeyPrefix;
        public volatile int cacheNum = 0;
        public int cacheReadIndex = 0;
        public List<byte[]> tuples = new ArrayList();
        private int cacheSize = 0;
        private Object lock = new Object();
        private boolean isActive = true;

        protected PendingBatch() {
        }

        public void addData(byte[] bArr) {
            this.tuples.add(bArr);
            this.cacheSize += bArr.length;
            if (this.cacheSize > BatchCache.this.maxFlushSize) {
                Iterator<byte[]> it = this.tuples.iterator();
                while (it.hasNext()) {
                    BatchCache.this.put(this.cacheKeyPrefix + String.valueOf(this.cacheNum), it.next());
                    this.cacheNum++;
                }
                this.tuples = new ArrayList();
                this.cacheSize = 0;
            }
        }

        public boolean addTuples(byte[] bArr) {
            synchronized (this.lock) {
                if (!this.isActive) {
                    return false;
                }
                addData(bArr);
                return true;
            }
        }

        public boolean addTuples(KryoTupleSerializer kryoTupleSerializer, Tuple tuple) {
            byte[] serialize = kryoTupleSerializer.serialize(tuple);
            synchronized (this.lock) {
                if (!this.isActive) {
                    return false;
                }
                addData(serialize);
                return true;
            }
        }

        public List<byte[]> getTuples() {
            ArrayList arrayList = new ArrayList();
            synchronized (this.lock) {
                if (this.isActive) {
                    while (this.cacheReadIndex < this.cacheNum) {
                        String str = this.cacheKeyPrefix + String.valueOf(this.cacheReadIndex);
                        arrayList.add((byte[]) BatchCache.this.get(str));
                        BatchCache.this.remove(str);
                        this.cacheReadIndex++;
                    }
                    arrayList.addAll(this.tuples);
                    this.tuples = new ArrayList();
                    this.isActive = false;
                } else {
                    BatchCache.LOG.warn("Try to get cache tuples when cache has been read or removed!");
                }
            }
            return arrayList;
        }

        public void removeTuples() {
            synchronized (this.lock) {
                while (this.cacheReadIndex < this.cacheNum) {
                    BatchCache.this.remove(this.cacheKeyPrefix + String.valueOf(this.cacheReadIndex));
                    this.cacheReadIndex++;
                }
                this.tuples = new ArrayList();
                this.isActive = false;
            }
        }

        public String toString() {
            return "cacheNum: " + this.cacheNum + ", Pending tuple size:" + (this.tuples != null ? this.tuples.size() : 0);
        }
    }

    public BatchCache(TopologyContext topologyContext, Set<String> set, StormTopology stormTopology) {
        this.stormConf = topologyContext.getStormConf();
        this.workerDir = topologyContext.getWorkerIdDir();
        this.taskId = topologyContext.getThisTaskId();
        this.isExactlyOnceMode = JStormUtils.parseBoolean(this.stormConf.get("transaction.exactly.once.mode"), true);
        this.cacheDir = this.workerDir + "/transactionCache/task-" + this.taskId;
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            int groupIndex = TransactionCommon.groupIndex(topologyContext.getRawTopology(), it.next());
            this.pendingBatches.put(Integer.valueOf(groupIndex), new HashMap());
            this.pendingBatchGroups.add(Integer.valueOf(groupIndex));
        }
        this.maxFlushSize = ConfigExtension.getTransactionCacheBatchFlushSize(this.stormConf);
        Options options = new Options();
        options.setCreateMissingColumnFamilies(true).setCreateIfMissing(true);
        options.setWriteBufferSize(ConfigExtension.getTransactionCacheBlockSize(this.stormConf) != null ? ConfigExtension.getTransactionCacheBlockSize(this.stormConf).longValue() : FileUtils.ONE_GB);
        options.setMaxWriteBufferNumber(ConfigExtension.getTransactionMaxCacheBlockNum(this.stormConf) != null ? ConfigExtension.getTransactionMaxCacheBlockNum(this.stormConf).intValue() : 3);
        try {
            HashMap hashMap = new HashMap();
            hashMap.put("rocksdb.root.dir", this.cacheDir);
            hashMap.put("rocksdb.reset", true);
            initDir(hashMap);
            initDb(null, options);
            this.serializer = new KryoTupleSerializer(this.stormConf, stormTopology);
            this.deserializer = new KryoTupleDeserializer(this.stormConf, topologyContext, stormTopology);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public boolean isExactlyOnceMode() {
        return this.isExactlyOnceMode;
    }

    private synchronized PendingBatch getPendingBatch(BatchGroupId batchGroupId, boolean z, boolean z2, Map<Integer, Long> map) {
        Map<Long, PendingBatch> map2 = this.pendingBatches.get(Integer.valueOf(batchGroupId.groupId));
        PendingBatch pendingBatch = map2.get(Long.valueOf(batchGroupId.batchId));
        if (pendingBatch == null && z && isPendingBatch(batchGroupId, map)) {
            pendingBatch = new PendingBatch();
            pendingBatch.cacheKeyPrefix = String.valueOf(batchGroupId.groupId) + String.valueOf(batchGroupId.batchId);
            map2.put(Long.valueOf(batchGroupId.batchId), pendingBatch);
        } else if (pendingBatch != null && z2) {
            map2.remove(Long.valueOf(batchGroupId.batchId));
        }
        return pendingBatch;
    }

    public boolean cachePendingBatch(BatchGroupId batchGroupId, byte[] bArr, Map<Integer, Long> map) {
        PendingBatch pendingBatch = getPendingBatch(batchGroupId, true, false, map);
        if (pendingBatch != null) {
            return pendingBatch.addTuples(bArr);
        }
        return false;
    }

    public boolean cachePendingBatch(BatchGroupId batchGroupId, Tuple tuple, Map<Integer, Long> map) {
        PendingBatch pendingBatch = getPendingBatch(batchGroupId, true, false, map);
        if (pendingBatch != null) {
            return pendingBatch.addTuples(this.serializer.serialize(tuple));
        }
        return false;
    }

    public boolean isPendingBatch(BatchGroupId batchGroupId, Map<Integer, Long> map) {
        boolean z = false;
        if (batchGroupId.batchId == 0) {
            return false;
        }
        if (this.isExactlyOnceMode) {
            if (batchGroupId.batchId > map.get(Integer.valueOf(batchGroupId.groupId)).longValue() + 1) {
                z = true;
            }
        }
        return z;
    }

    public List<Tuple> getNextPendingTuples(Map<Integer, Long> map) {
        ArrayList arrayList = null;
        List<byte[]> nextPendingBatch = getNextPendingBatch(map);
        if (nextPendingBatch != null) {
            arrayList = new ArrayList();
            Iterator<byte[]> it = nextPendingBatch.iterator();
            while (it.hasNext()) {
                arrayList.add(this.deserializer.deserialize(it.next()));
            }
        }
        return arrayList;
    }

    public List<byte[]> getNextPendingBatch(Map<Integer, Long> map) {
        List<byte[]> list = null;
        int i = 0;
        while (true) {
            if (i >= this.pendingBatchGroups.size()) {
                break;
            }
            int intValue = this.pendingBatchGroups.get(this.pendingBatchGroupIndex).intValue();
            int i2 = this.pendingBatchGroupIndex + 1;
            this.pendingBatchGroupIndex = i2;
            this.pendingBatchGroupIndex = i2 % this.pendingBatchGroups.size();
            PendingBatch pendingBatch = getPendingBatch(new BatchGroupId(intValue, map.get(Integer.valueOf(intValue)).longValue() + 1), false, true, map);
            if (pendingBatch != null) {
                list = pendingBatch.getTuples();
                break;
            }
            i++;
        }
        return list;
    }

    public synchronized void cleanup(int i) {
        Map<Long, PendingBatch> map = this.pendingBatches.get(Integer.valueOf(i));
        if (map != null) {
            Iterator<Map.Entry<Long, PendingBatch>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().removeTuples();
            }
            map.clear();
        }
    }

    @Override // com.alibaba.jstorm.cache.RocksDBCache
    protected byte[] serialize(Object obj) {
        return (byte[]) obj;
    }

    @Override // com.alibaba.jstorm.cache.RocksDBCache
    protected Object deserialize(byte[] bArr) {
        return bArr;
    }

    public String toString() {
        return "pendingBatches: " + this.pendingBatches.toString() + ", pendingBatchGroups: " + this.pendingBatchGroups;
    }
}
