package com.alibaba.jstorm.transactional.spout;

import backtype.storm.spout.SpoutOutputCollectorCb;
import backtype.storm.task.ICollectorCallback;
import backtype.storm.tuple.Values;
import com.alibaba.jstorm.transactional.BatchGroupId;
import com.alibaba.jstorm.transactional.BatchSnapshot;
import com.alibaba.jstorm.transactional.TransactionCommon;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/transactional/spout/TransactionSpoutOutputCollector.class */
public class TransactionSpoutOutputCollector extends SpoutOutputCollectorCb {
    public static Logger LOG = LoggerFactory.getLogger(TransactionSpoutOutputCollector.class);
    SpoutOutputCollectorCb delegate;
    private TransactionSpout spout;
    private int groupId;
    private long currBatchId;
    private Map<Integer, Integer> msgCount;
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    private BatchInfo currBatchInfo = new BatchInfo();

    /* loaded from: input_file:com/alibaba/jstorm/transactional/spout/TransactionSpoutOutputCollector$BatchInfo.class */
    public static class BatchInfo {
        public long batchId;
        public Object endPos;

        public BatchInfo() {
        }

        public BatchInfo(BatchInfo batchInfo) {
            this.batchId = batchInfo.batchId;
            this.endPos = batchInfo.endPos;
        }

        public void init(long j) {
            this.batchId = j;
            this.endPos = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/jstorm/transactional/spout/TransactionSpoutOutputCollector$CollectorCallback.class */
    public class CollectorCallback implements ICollectorCallback {
        private CollectorCallback() {
        }

        @Override // backtype.storm.task.ICollectorCallback
        public void execute(String str, List<Integer> list, List list2) {
            for (Integer num : list) {
                TransactionSpoutOutputCollector.this.msgCount.put(num, Integer.valueOf(((Integer) TransactionSpoutOutputCollector.this.msgCount.get(num)).intValue() + 1));
            }
        }
    }

    public TransactionSpoutOutputCollector(SpoutOutputCollectorCb spoutOutputCollectorCb, TransactionSpout transactionSpout) {
        this.delegate = spoutOutputCollectorCb;
        this.spout = transactionSpout;
    }

    public void init(BatchGroupId batchGroupId, Set<Integer> set) {
        try {
            this.lock.writeLock().lock();
            setGroupId(batchGroupId.groupId);
            setCurrBatchId(batchGroupId.batchId);
            initMsgCount(set);
            this.currBatchInfo.init(batchGroupId.batchId);
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public void initMsgCount(Set<Integer> set) {
        this.msgCount = new HashMap();
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            this.msgCount.put(it.next(), 0);
        }
    }

    public void setGroupId(int i) {
        this.groupId = i;
    }

    public int getGroupId() {
        return this.groupId;
    }

    public void setCurrBatchId(long j) {
        this.currBatchId = j;
    }

    public long getCurrBatchId() {
        return this.currBatchId;
    }

    public void waitActive() {
        while (!this.spout.isActive()) {
            JStormUtils.sleepMs(1L);
        }
    }

    @Override // backtype.storm.spout.ISpoutOutputCollector
    public List<Integer> emit(String str, List<Object> list, Object obj) {
        return emit(str, list, obj, new CollectorCallback());
    }

    @Override // backtype.storm.spout.ISpoutOutputCollector
    public void emitDirect(int i, String str, List<Object> list, Object obj) {
        emitDirect(i, str, list, obj, new CollectorCallback());
    }

    @Override // backtype.storm.spout.SpoutOutputCollectorCb
    public List<Integer> emit(String str, List<Object> list, Object obj, ICollectorCallback iCollectorCallback) {
        try {
            this.lock.readLock().lock();
            ArrayList arrayList = new ArrayList();
            arrayList.add(new BatchGroupId(this.groupId, this.currBatchId));
            arrayList.addAll(list);
            this.delegate.emit(str, arrayList, null, iCollectorCallback != null ? iCollectorCallback : new CollectorCallback());
            this.lock.readLock().unlock();
            return null;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    @Override // backtype.storm.spout.SpoutOutputCollectorCb
    public void emitDirect(int i, String str, List<Object> list, Object obj, ICollectorCallback iCollectorCallback) {
        try {
            this.lock.readLock().lock();
            ArrayList arrayList = new ArrayList();
            arrayList.add(new BatchGroupId(this.groupId, this.currBatchId));
            arrayList.addAll(list);
            this.delegate.emitDirect(i, str, arrayList, null, iCollectorCallback != null ? iCollectorCallback : new CollectorCallback());
            this.lock.readLock().unlock();
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public List<Integer> emitByDelegate(String str, List<Object> list, Object obj) {
        return emitByDelegate(str, list, obj, null);
    }

    public List<Integer> emitByDelegate(String str, List<Object> list, Object obj, ICollectorCallback iCollectorCallback) {
        return this.delegate.emit(str, list, obj, iCollectorCallback);
    }

    public void emitDirectByDelegate(int i, String str, List<Object> list, Object obj) {
        emitDirectByDelegate(i, str, list, obj, null);
    }

    public void emitDirectByDelegate(int i, String str, List<Object> list, Object obj, ICollectorCallback iCollectorCallback) {
        this.delegate.emitDirect(i, str, list, obj, iCollectorCallback);
    }

    @Override // backtype.storm.spout.ISpoutOutputCollector
    public void reportError(Throwable th) {
        this.delegate.reportError(th);
    }

    public BatchInfo flushBarrier() {
        try {
            this.lock.writeLock().lock();
            BatchInfo batchInfo = new BatchInfo(this.currBatchInfo);
            this.delegate.flush();
            BatchGroupId batchGroupId = new BatchGroupId(this.groupId, this.currBatchId);
            for (Map.Entry<Integer, Integer> entry : this.msgCount.entrySet()) {
                emitDirectByDelegate(entry.getKey().intValue(), TransactionCommon.BARRIER_STREAM_ID, new Values(batchGroupId, new BatchSnapshot(batchGroupId, entry.setValue(0).intValue())), null, null);
            }
            this.delegate.flush();
            moveToNextBatch();
            this.lock.writeLock().unlock();
            return batchInfo;
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public void moveToNextBatch() {
        this.currBatchId++;
        this.currBatchInfo.batchId = this.currBatchId;
    }

    public void flushInitBarrier() {
        try {
            this.lock.writeLock().lock();
            this.delegate.flush();
            BatchGroupId batchGroupId = new BatchGroupId(this.groupId, 0L);
            BatchSnapshot batchSnapshot = new BatchSnapshot(batchGroupId, 0);
            for (Map.Entry<Integer, Integer> entry : this.msgCount.entrySet()) {
                entry.setValue(0);
                emitDirectByDelegate(entry.getKey().intValue(), TransactionCommon.BARRIER_STREAM_ID, new Values(batchGroupId, batchSnapshot), null, null);
            }
            this.delegate.flush();
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }
}
