package com.alibaba.jstorm.task.execute;

import backtype.storm.task.ICollectorCallback;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleExt;
import backtype.storm.tuple.TupleImplExt;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/BoltBatchCollector.class */
public class BoltBatchCollector extends BoltCollector {
    private static Logger LOG = LoggerFactory.getLogger(BoltBatchCollector.class);
    private BatchCollector batchCollector;
    private final RotatingMap<Tuple, Integer> pendingTuples;
    private final BlockingQueue<Tuple> pendingAcks;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/jstorm/task/execute/BoltBatchCollector$BoltMsgInfo.class */
    public class BoltMsgInfo extends MsgInfo {
        public Collection<Tuple> anchors;

        public BoltMsgInfo(String str, List<Object> list, Collection<Tuple> collection, Integer num, ICollectorCallback iCollectorCallback) {
            super(str, list, num, iCollectorCallback);
            this.anchors = collection;
        }
    }

    public BoltBatchCollector(Task task, RotatingMap<Tuple, Long> rotatingMap, int i) {
        super(task, rotatingMap, i);
        this.pendingTuples = new RotatingMap<>(3, true);
        this.pendingAcks = new LinkedBlockingQueue();
        this.batchCollector = new BatchCollector(this.task_id, this.topologyContext.getThisComponentId(), this.storm_conf) { // from class: com.alibaba.jstorm.task.execute.BoltBatchCollector.1
            @Override // com.alibaba.jstorm.task.execute.BatchCollector
            public List<MsgInfo> push(String str, List<Object> list, Integer num, Collection<Tuple> collection, Object obj, Long l, ICollectorCallback iCollectorCallback) {
                List<MsgInfo> addToBatches;
                List<MsgInfo> addToBatches2;
                if (collection != null) {
                    synchronized (BoltBatchCollector.this.pendingTuples) {
                        for (Tuple tuple : collection) {
                            Integer num2 = (Integer) BoltBatchCollector.this.pendingTuples.get(tuple);
                            if (num2 == null) {
                                num2 = 0;
                            }
                            BoltBatchCollector.this.pendingTuples.put(tuple, Integer.valueOf(num2.intValue() + 1));
                        }
                    }
                }
                if (num != null) {
                    synchronized (this.directBatches) {
                        addToBatches2 = BoltBatchCollector.this.addToBatches(num.toString() + "-" + str, this.directBatches, str, list, num, collection, this.batchSize, iCollectorCallback);
                    }
                    return addToBatches2;
                }
                synchronized (this.streamToBatches) {
                    addToBatches = BoltBatchCollector.this.addToBatches(str, this.streamToBatches, str, list, num, collection, this.batchSize, iCollectorCallback);
                }
                return addToBatches;
            }

            @Override // com.alibaba.jstorm.task.execute.BatchCollector
            public void flush() {
                ArrayList<List> arrayList = new ArrayList();
                synchronized (this.streamToBatches) {
                    for (Map.Entry<String, List<MsgInfo>> entry : this.streamToBatches.entrySet()) {
                        if (entry.getValue() != null && entry.getValue().size() > 0) {
                            arrayList.add(JStormUtils.mk_list(entry.getKey(), null, entry.getValue()));
                            this.streamToBatches.put(entry.getKey(), null);
                        }
                    }
                }
                synchronized (this.directBatches) {
                    for (Map.Entry<String, List<MsgInfo>> entry2 : this.directBatches.entrySet()) {
                        if (entry2.getValue() != null && entry2.getValue().size() > 0) {
                            String[] split = entry2.getKey().split("-", 2);
                            arrayList.add(JStormUtils.mk_list(split[1], split[0], entry2.getValue()));
                            this.directBatches.put(entry2.getKey(), null);
                        }
                    }
                }
                for (List list : arrayList) {
                    BoltBatchCollector.this.sendBatch((String) list.get(0), (String) list.get(1), (List) list.get(2));
                }
                int size = BoltBatchCollector.this.pendingAcks.size();
                while (true) {
                    size--;
                    if (size < 0) {
                        return;
                    }
                    Tuple tuple = (Tuple) BoltBatchCollector.this.pendingAcks.poll();
                    if (tuple != null && !BoltBatchCollector.this.sendAckTuple(tuple)) {
                        try {
                            BoltBatchCollector.this.pendingAcks.put(tuple);
                        } catch (InterruptedException e) {
                            BoltBatchCollector.LOG.warn("Failed to put ackTuple, tuple=" + tuple, e);
                        }
                    }
                }
            }
        };
    }

    @Override // com.alibaba.jstorm.task.execute.BoltCollector
    protected List<Integer> sendBoltMsg(String str, Collection<Tuple> collection, List<Object> list, Integer num, ICollectorCallback iCollectorCallback) {
        List<Integer> arrayList = new ArrayList();
        List<MsgInfo> push = this.batchCollector.push(str, list, num, collection, null, null, iCollectorCallback);
        if (push != null && push.size() > 0) {
            arrayList = sendBatch(str, num != null ? num.toString() : null, push);
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<MsgInfo> addToBatches(String str, Map<String, List<MsgInfo>> map, String str2, List<Object> list, Integer num, Collection<Tuple> collection, int i, ICollectorCallback iCollectorCallback) {
        List<MsgInfo> list2 = map.get(str);
        if (list2 == null) {
            list2 = new ArrayList();
            map.put(str, list2);
        }
        list2.add(new BoltMsgInfo(str2, list, collection, num, iCollectorCallback));
        if (list2.size() <= i) {
            return null;
        }
        map.put(str, null);
        return list2;
    }

    public List<Integer> sendBatch(String str, String str2, List<MsgInfo> list) {
        long time = this.emitTimer.getTime();
        try {
            try {
                ArrayList arrayList = new ArrayList();
                Map<List<Integer>, List<MsgInfo>> batch = str2 != null ? this.sendTargets.getBatch(Integer.valueOf(str2), str, list) : this.sendTargets.getBatch(str, list);
                if (batch != null && batch.size() != 0) {
                    for (Map.Entry<List<Integer>, List<MsgInfo>> entry : batch.entrySet()) {
                        List<Integer> key = entry.getKey();
                        List<MsgInfo> value = entry.getValue();
                        for (Integer num : key) {
                            BatchTuple batchTuple = new BatchTuple(num.intValue(), value.size());
                            for (MsgInfo msgInfo : value) {
                                TupleImplExt tupleImplExt = new TupleImplExt(this.topologyContext, msgInfo.values, this.task_id.intValue(), msgInfo.streamId, getMessageId(((BoltMsgInfo) msgInfo).anchors));
                                tupleImplExt.setTargetTaskId(num.intValue());
                                batchTuple.addToBatch(tupleImplExt);
                            }
                            this.taskTransfer.transfer(batchTuple);
                        }
                        for (MsgInfo msgInfo2 : value) {
                            if (msgInfo2.callback != null) {
                                msgInfo2.callback.execute(key);
                            }
                        }
                    }
                }
                Iterator<MsgInfo> it = list.iterator();
                while (it.hasNext()) {
                    Collection<Tuple> collection = ((BoltMsgInfo) it.next()).anchors;
                    if (collection != null && collection.size() > 0) {
                        for (Tuple tuple : collection) {
                            synchronized (this.pendingTuples) {
                                Integer num2 = this.pendingTuples.get(tuple);
                                if (num2 != null) {
                                    Integer valueOf = Integer.valueOf(num2.intValue() - 1);
                                    if (valueOf.intValue() <= 0) {
                                        this.pendingTuples.remove(tuple);
                                    } else {
                                        this.pendingTuples.put(tuple, valueOf);
                                    }
                                }
                            }
                        }
                    }
                }
                return arrayList;
            } catch (Exception e) {
                LOG.error("bolt emit", e);
                this.emitTimer.updateTime(time);
                return new ArrayList();
            }
        } finally {
            this.emitTimer.updateTime(time);
        }
    }

    @Override // com.alibaba.jstorm.task.execute.BoltCollector, backtype.storm.task.IOutputCollector
    public void ack(Tuple tuple) {
        if (this.ackerNum.intValue() > 0 && !sendAckTuple(tuple)) {
            this.pendingAcks.add(tuple);
        }
        Long l = (Long) this.tuple_start_times.remove(tuple);
        if (l == null || !JStormMetrics.enabled) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.task_stats.bolt_acked_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), l.longValue(), ((TupleExt) tuple).getCreationTimeStamp(), currentTimeMillis);
    }

    @Override // com.alibaba.jstorm.task.execute.BoltCollector, backtype.storm.task.IOutputCollector
    public void fail(Tuple tuple) {
        if (this.ackerNum.intValue() > 0) {
            this.pending_acks.remove(tuple);
            Iterator<Map.Entry<Long, Long>> it = tuple.getMessageId().getAnchorsToIds().entrySet().iterator();
            while (it.hasNext()) {
                sendBoltMsg("__ack_fail", null, JStormUtils.mk_list(it.next().getKey()), null, null);
            }
        }
        this.task_stats.bolt_failed_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
    }

    @Override // com.alibaba.jstorm.task.execute.BoltCollector
    protected MessageId getMessageId(Collection<Tuple> collection) {
        HashMap hashMap = new HashMap();
        if (collection != null) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastRotate > this.rotateTime) {
                this.pending_acks.rotate();
                synchronized (this.pendingTuples) {
                    this.pendingTuples.rotate();
                }
                this.lastRotate = currentTimeMillis;
            }
            for (Tuple tuple : collection) {
                Long valueOf = Long.valueOf(MessageId.generateId(this.random));
                synchronized (this.pending_acks) {
                    put_xor(this.pending_acks, tuple, valueOf);
                }
                Iterator<Long> it = tuple.getMessageId().getAnchorsToIds().keySet().iterator();
                while (it.hasNext()) {
                    put_xor(hashMap, it.next(), valueOf);
                }
            }
        }
        return MessageId.makeId(hashMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean sendAckTuple(Tuple tuple) {
        Integer num;
        boolean z = false;
        synchronized (this.pendingTuples) {
            num = this.pendingTuples.get(tuple);
        }
        if (num == null || num.intValue() <= 0) {
            Object remove = this.pending_acks.remove(tuple);
            long longValue = remove != null ? ((Long) remove).longValue() : 0L;
            for (Map.Entry<Long, Long> entry : tuple.getMessageId().getAnchorsToIds().entrySet()) {
                sendBoltMsg("__ack_ack", null, JStormUtils.mk_list(entry.getKey(), Long.valueOf(JStormUtils.bit_xor(entry.getValue(), Long.valueOf(longValue)))), null, null);
            }
            z = true;
        }
        return z;
    }

    @Override // backtype.storm.task.OutputCollectorCb
    public void flush() {
        this.batchCollector.flush();
    }

    @Override // com.alibaba.jstorm.task.execute.BoltCollector
    void transferCtr(TupleImplExt tupleImplExt) {
        BatchTuple batchTuple = new BatchTuple(tupleImplExt.getTargetTaskId(), 1);
        batchTuple.addToBatch(tupleImplExt);
        this.taskTransfer.transferControl(batchTuple);
    }

    @Override // com.alibaba.jstorm.task.execute.BoltCollector
    void unanchoredSend(TopologyContext topologyContext, TaskSendTargets taskSendTargets, TaskTransfer taskTransfer, String str, List<Object> list) {
        UnanchoredSend.sendBatch(topologyContext, taskSendTargets, taskTransfer, str, list);
    }
}
