package com.alibaba.jstorm.task.execute.spout;

import backtype.storm.task.ICollectorCallback;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImplExt;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.metric.CallIntervalGauge;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.comm.TupleInfo;
import com.alibaba.jstorm.task.execute.BatchCollector;
import com.alibaba.jstorm.task.execute.MsgInfo;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.Pair;
import com.alibaba.jstorm.utils.TimeOutMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.org.apache.commons.cli.HelpFormatter;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/spout/SpoutBatchCollector.class */
public class SpoutBatchCollector extends SpoutCollector {
    private static Logger LOG = LoggerFactory.getLogger(SpoutBatchCollector.class);
    protected BatchCollector batchCollector;
    protected int batchSize;
    private CallIntervalGauge timeIntervalGauge;
    private final Map<Integer, Map<String, List<Object>>> pendingSendMsgs;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/jstorm/task/execute/spout/SpoutBatchCollector$SpoutMsgInfo.class */
    public class SpoutMsgInfo extends MsgInfo {
        public Long rootId;
        public Object messageId;

        public SpoutMsgInfo(String str, List<Object> list, Integer num, Object obj, Long l, ICollectorCallback iCollectorCallback) {
            super(str, list, num, iCollectorCallback);
            this.messageId = obj;
            this.rootId = l;
        }
    }

    public SpoutBatchCollector(Task task, TimeOutMap<Long, TupleInfo> timeOutMap, DisruptorQueue disruptorQueue) {
        super(task, timeOutMap, disruptorQueue);
        this.pendingSendMsgs = new HashMap();
        String thisComponentId = this.topology_context.getThisComponentId();
        this.timeIntervalGauge = new CallIntervalGauge();
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(task.getTopologyId(), thisComponentId, task.getTaskId().intValue(), MetricDef.TASK_BATCH_INTERVAL_TIME, MetricType.GAUGE), new AsmGauge(this.timeIntervalGauge));
        this.batchCollector = new BatchCollector(this.task_id, thisComponentId, this.storm_conf) { // from class: com.alibaba.jstorm.task.execute.spout.SpoutBatchCollector.1
            @Override // com.alibaba.jstorm.task.execute.BatchCollector
            public void pushAndSend(String str, List<Object> list, Integer num, Collection<Tuple> collection, Object obj, Long l, ICollectorCallback iCollectorCallback) {
                if (num != null) {
                    synchronized (this.directBatches) {
                        List<MsgInfo> addToBatches = SpoutBatchCollector.this.addToBatches(num.toString() + HelpFormatter.DEFAULT_OPT_PREFIX + str, this.directBatches, str, list, num, obj, l, this.batchSize, iCollectorCallback);
                        if (addToBatches != null && addToBatches.size() > 0) {
                            SpoutBatchCollector.this.timeIntervalGauge.incrementAndGet();
                            SpoutBatchCollector.this.sendBatch(str, num != null ? num.toString() : null, addToBatches);
                        }
                    }
                    return;
                }
                synchronized (this.streamToBatches) {
                    List<MsgInfo> addToBatches2 = SpoutBatchCollector.this.addToBatches(str, this.streamToBatches, str, list, num, obj, l, this.batchSize, iCollectorCallback);
                    if (addToBatches2 != null && addToBatches2.size() > 0) {
                        SpoutBatchCollector.this.timeIntervalGauge.incrementAndGet();
                        SpoutBatchCollector.this.sendBatch(str, num != null ? num.toString() : null, addToBatches2);
                    }
                }
            }

            @Override // com.alibaba.jstorm.task.execute.BatchCollector
            public synchronized void flush() {
                synchronized (this.streamToBatches) {
                    for (Map.Entry<String, List<MsgInfo>> entry : this.streamToBatches.entrySet()) {
                        List<MsgInfo> put = this.streamToBatches.put(entry.getKey(), null);
                        if (put != null && put.size() > 0) {
                            SpoutBatchCollector.this.sendBatch(entry.getKey(), null, put);
                        }
                    }
                }
                synchronized (this.directBatches) {
                    for (Map.Entry<String, List<MsgInfo>> entry2 : this.directBatches.entrySet()) {
                        List<MsgInfo> put2 = this.directBatches.put(entry2.getKey(), null);
                        if (put2 != null && put2.size() > 0) {
                            String[] split = entry2.getKey().split(HelpFormatter.DEFAULT_OPT_PREFIX, 2);
                            SpoutBatchCollector.this.sendBatch(split[1], split[0], put2);
                        }
                    }
                }
            }
        };
        this.batchSize = this.batchCollector.getConfigBatchSize();
    }

    private List<Object> addToPendingSendBatch(int i, String str, List<Object> list) {
        Map<String, List<Object>> map = this.pendingSendMsgs.get(Integer.valueOf(i));
        if (map == null) {
            map = new HashMap();
            this.pendingSendMsgs.put(Integer.valueOf(i), map);
        }
        List<Object> list2 = map.get(str);
        if (list2 == null) {
            list2 = new ArrayList();
            map.put(str, list2);
        }
        list2.addAll(list);
        if (list2.size() >= this.batchSize) {
            return list2;
        }
        return null;
    }

    @Override // com.alibaba.jstorm.task.execute.spout.SpoutCollector
    protected List<Integer> sendSpoutMsg(String str, List<Object> list, Object obj, Integer num, ICollectorCallback iCollectorCallback) {
        this.batchCollector.pushAndSend(str, list, num, null, obj, getRootId(obj), iCollectorCallback);
        return null;
    }

    public List<Integer> sendBatch(String str, String str2, List<MsgInfo> list) {
        long time = this.emitTotalTimer.getTime();
        try {
            Map<Object, List<MsgInfo>> batch = str2 != null ? this.sendTargets.getBatch(Integer.valueOf(str2), str, list) : this.sendTargets.getBatch(str, list);
            if (batch == null || batch.size() == 0) {
                ArrayList arrayList = new ArrayList();
                this.emitTotalTimer.updateTime(time);
                return arrayList;
            }
            HashMap hashMap = new HashMap();
            for (Map.Entry<Object, List<MsgInfo>> entry : batch.entrySet()) {
                Object key = entry.getKey();
                List<Integer> mk_list = key instanceof Integer ? JStormUtils.mk_list((Integer) key) : (List) key;
                List<MsgInfo> value = entry.getValue();
                for (int i = 0; i < mk_list.size(); i++) {
                    Integer num = mk_list.get(i);
                    ArrayList arrayList2 = new ArrayList();
                    Iterator<MsgInfo> it = value.iterator();
                    while (it.hasNext()) {
                        SpoutMsgInfo spoutMsgInfo = (SpoutMsgInfo) it.next();
                        arrayList2.add(new Pair(getMessageId(spoutMsgInfo, hashMap), spoutMsgInfo.values));
                    }
                    TupleImplExt tupleImplExt = new TupleImplExt(this.topology_context, arrayList2, this.task_id.intValue(), str, null);
                    tupleImplExt.setTargetTaskId(num.intValue());
                    tupleImplExt.setBatchTuple(true);
                    this.transfer_fn.transfer(tupleImplExt);
                }
                for (MsgInfo msgInfo : value) {
                    if (msgInfo.callback != null) {
                        msgInfo.callback.execute(str, mk_list, msgInfo.values);
                    }
                }
            }
            if (hashMap.size() > 0) {
                sendBatch("__ack_init", null, new ArrayList(hashMap.values()));
            }
            return null;
        } finally {
            this.emitTotalTimer.updateTime(time);
        }
    }

    protected MessageId getMessageId(SpoutMsgInfo spoutMsgInfo, Map<Long, MsgInfo> map) {
        MessageId messageId = null;
        if (spoutMsgInfo.rootId != null) {
            Long valueOf = Long.valueOf(MessageId.generateId(this.random));
            messageId = MessageId.makeRootId(spoutMsgInfo.rootId.longValue(), valueOf.longValue());
            MsgInfo msgInfo = map.get(spoutMsgInfo.rootId);
            if (msgInfo == null) {
                TupleInfo tupleInfo = new TupleInfo();
                tupleInfo.setStream(spoutMsgInfo.streamId);
                tupleInfo.setValues(spoutMsgInfo.values);
                tupleInfo.setMessageId(spoutMsgInfo.messageId);
                tupleInfo.setTimestamp(System.currentTimeMillis());
                this.pending.putHead(spoutMsgInfo.rootId, tupleInfo);
                map.put(spoutMsgInfo.rootId, new SpoutMsgInfo("__ack_init", JStormUtils.mk_list(spoutMsgInfo.rootId, Long.valueOf(JStormUtils.bit_xor_vals(valueOf)), this.task_id), null, null, null, null));
            } else {
                List<Object> list = msgInfo.values;
                list.set(1, Long.valueOf(JStormUtils.bit_xor_vals(list.get(1), valueOf)));
            }
        }
        return messageId;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<MsgInfo> addToBatches(String str, Map<String, List<MsgInfo>> map, String str2, List<Object> list, Integer num, Object obj, Long l, int i, ICollectorCallback iCollectorCallback) {
        List<MsgInfo> list2 = map.get(str);
        if (list2 == null) {
            list2 = new ArrayList();
            map.put(str, list2);
        }
        list2.add(new SpoutMsgInfo(str2, list, num, obj, l, iCollectorCallback));
        if (list2.size() <= i) {
            return null;
        }
        List<MsgInfo> list3 = list2;
        map.put(str, null);
        return list3;
    }

    @Override // backtype.storm.spout.SpoutOutputCollectorCb
    public void flush() {
        this.batchCollector.flush();
    }
}
