package com.alibaba.jstorm.task.execute;

import backtype.storm.Config;
import backtype.storm.task.ICollectorCallback;
import backtype.storm.task.OutputCollectorCb;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleExt;
import backtype.storm.tuple.TupleImplExt;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/BoltCollector.class */
public class BoltCollector extends OutputCollectorCb {
    private static Logger LOG = LoggerFactory.getLogger(BoltCollector.class);
    protected ITaskReportErr reportError;
    protected TaskSendTargets sendTargets;
    protected TaskTransfer taskTransfer;
    protected TopologyContext topologyContext;
    protected Integer task_id;
    protected final RotatingMap<Tuple, Long> tuple_start_times;
    protected TaskBaseMetric task_stats;
    protected long rotateTime;
    protected Map storm_conf;
    protected Integer ackerNum;
    protected AsmHistogram emitTimer;
    protected Random random;
    protected long lastRotate = System.currentTimeMillis();
    protected final RotatingMap<Tuple, Long> pending_acks = new RotatingMap<>(3);

    public BoltCollector(Task task, RotatingMap<Tuple, Long> rotatingMap, int i) {
        this.rotateTime = (1000 * i) / 2;
        this.reportError = task.getReportErrorDie();
        this.sendTargets = task.getTaskSendTargets();
        this.storm_conf = task.getStormConf();
        this.taskTransfer = task.getTaskTransfer();
        this.topologyContext = task.getTopologyContext();
        this.task_id = task.getTaskId();
        this.task_stats = task.getTaskStats();
        this.tuple_start_times = rotatingMap;
        this.ackerNum = JStormUtils.parseInt(this.storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
        this.emitTimer = (AsmHistogram) JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyContext.getTopologyId(), this.topologyContext.getThisComponentId(), this.task_id.intValue(), MetricDef.COLLECTOR_EMIT_TIME, MetricType.HISTOGRAM), new AsmHistogram());
        this.emitTimer.setEnabled(false);
        this.random = new Random();
        this.random.setSeed(System.currentTimeMillis());
    }

    @Override // backtype.storm.task.IOutputCollector
    public List<Integer> emit(String str, Collection<Tuple> collection, List<Object> list) {
        return sendBoltMsg(str, collection, list, null, null);
    }

    @Override // backtype.storm.task.IOutputCollector
    public void emitDirect(int i, String str, Collection<Tuple> collection, List<Object> list) {
        sendBoltMsg(str, collection, list, Integer.valueOf(i), null);
    }

    @Override // backtype.storm.task.OutputCollectorCb
    public List<Integer> emit(String str, Collection<Tuple> collection, List<Object> list, ICollectorCallback iCollectorCallback) {
        return sendBoltMsg(str, collection, list, null, iCollectorCallback);
    }

    @Override // backtype.storm.task.OutputCollectorCb
    public void emitDirect(int i, String str, Collection<Tuple> collection, List<Object> list, ICollectorCallback iCollectorCallback) {
        sendBoltMsg(str, collection, list, Integer.valueOf(i), iCollectorCallback);
    }

    public List<Integer> emitCtrl(String str, Collection<Tuple> collection, List<Object> list) {
        return sendCtrlMsg(str, list, collection, null);
    }

    public void emitDirectCtrl(int i, String str, Collection<Tuple> collection, List<Object> list) {
        sendCtrlMsg(str, list, collection, Integer.valueOf(i));
    }

    protected List<Integer> sendBoltMsg(String str, Collection<Tuple> collection, List<Object> list, Integer num, ICollectorCallback iCollectorCallback) {
        return sendMsg(str, list, collection, num, iCollectorCallback);
    }

    protected MessageId getMessageId(Collection<Tuple> collection) {
        HashMap hashMap = new HashMap();
        if (collection != null) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastRotate > this.rotateTime) {
                this.pending_acks.rotate();
                this.lastRotate = currentTimeMillis;
            }
            for (Tuple tuple : collection) {
                Long valueOf = Long.valueOf(MessageId.generateId(this.random));
                put_xor(this.pending_acks, tuple, valueOf);
                Iterator<Long> it = tuple.getMessageId().getAnchorsToIds().keySet().iterator();
                while (it.hasNext()) {
                    put_xor(hashMap, it.next(), valueOf);
                }
            }
        }
        return MessageId.makeId(hashMap);
    }

    public List<Integer> sendMsg(String str, List<Object> list, Collection<Tuple> collection, Integer num, ICollectorCallback iCollectorCallback) {
        long time = this.emitTimer.getTime();
        List<Integer> list2 = null;
        try {
            try {
                list2 = num != null ? this.sendTargets.get(num, str, list, collection, null) : this.sendTargets.get(str, list, collection, null);
                for (Integer num2 : list2) {
                    TupleImplExt tupleImplExt = new TupleImplExt(this.topologyContext, list, this.task_id.intValue(), str, getMessageId(collection));
                    tupleImplExt.setTargetTaskId(num2.intValue());
                    this.taskTransfer.transfer(tupleImplExt);
                }
                if (list2 == null) {
                    list2 = new ArrayList();
                }
                if (iCollectorCallback != null) {
                    iCollectorCallback.execute(list2);
                }
                this.emitTimer.updateTime(time);
            } catch (Exception e) {
                LOG.error("bolt emit", e);
                if (list2 == null) {
                    list2 = new ArrayList();
                }
                if (iCollectorCallback != null) {
                    iCollectorCallback.execute(list2);
                }
                this.emitTimer.updateTime(time);
            }
            return list2;
        } catch (Throwable th) {
            if (list2 == null) {
                list2 = new ArrayList();
            }
            if (iCollectorCallback != null) {
                iCollectorCallback.execute(list2);
            }
            this.emitTimer.updateTime(time);
            throw th;
        }
    }

    void unanchoredSend(TopologyContext topologyContext, TaskSendTargets taskSendTargets, TaskTransfer taskTransfer, String str, List<Object> list) {
        UnanchoredSend.send(topologyContext, taskSendTargets, taskTransfer, str, list);
    }

    void transferCtr(TupleImplExt tupleImplExt) {
        this.taskTransfer.transferControl(tupleImplExt);
    }

    protected List<Integer> sendCtrlMsg(String str, List<Object> list, Collection<Tuple> collection, Integer num) {
        long time = this.emitTimer.getTime();
        List<Integer> list2 = null;
        try {
            try {
                list2 = num != null ? this.sendTargets.get(num, str, list, collection, null) : this.sendTargets.get(str, list, collection, null);
                for (Integer num2 : list2) {
                    TupleImplExt tupleImplExt = new TupleImplExt(this.topologyContext, list, this.task_id.intValue(), str, getMessageId(collection));
                    tupleImplExt.setTargetTaskId(num2.intValue());
                    transferCtr(tupleImplExt);
                }
            } catch (Exception e) {
                LOG.error("bolt emit", e);
                this.emitTimer.updateTime(time);
            }
            return list2;
        } finally {
            this.emitTimer.updateTime(time);
        }
    }

    @Override // backtype.storm.task.IOutputCollector
    public void ack(Tuple tuple) {
        if (this.ackerNum.intValue() > 0) {
            Object remove = this.pending_acks.remove(tuple);
            Long l = remove != null ? (Long) remove : 0L;
            for (Map.Entry<Long, Long> entry : tuple.getMessageId().getAnchorsToIds().entrySet()) {
                unanchoredSend(this.topologyContext, this.sendTargets, this.taskTransfer, "__ack_ack", JStormUtils.mk_list(entry.getKey(), Long.valueOf(JStormUtils.bit_xor(entry.getValue(), l))));
            }
        }
        Long l2 = (Long) this.tuple_start_times.remove(tuple);
        if (l2 == null || !JStormMetrics.enabled) {
            return;
        }
        this.task_stats.bolt_acked_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), l2.longValue(), ((TupleExt) tuple).getCreationTimeStamp(), System.currentTimeMillis());
    }

    @Override // backtype.storm.task.IOutputCollector
    public void fail(Tuple tuple) {
        if (this.ackerNum.intValue() > 0) {
            this.pending_acks.remove(tuple);
            Iterator<Map.Entry<Long, Long>> it = tuple.getMessageId().getAnchorsToIds().entrySet().iterator();
            while (it.hasNext()) {
                unanchoredSend(this.topologyContext, this.sendTargets, this.taskTransfer, "__ack_fail", JStormUtils.mk_list(it.next().getKey()));
            }
        }
        this.task_stats.bolt_failed_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
    }

    @Override // backtype.storm.task.IErrorReporter
    public void reportError(Throwable th) {
        this.reportError.report(th);
    }

    public static void put_xor(RotatingMap<Tuple, Long> rotatingMap, Tuple tuple, Long l) {
        Long l2 = rotatingMap.get(tuple);
        if (l2 == null) {
            l2 = 0L;
        }
        rotatingMap.put(tuple, Long.valueOf(JStormUtils.bit_xor(l2, l)));
    }

    public static void put_xor(Map<Long, Long> map, Long l, Long l2) {
        Long l3 = map.get(l);
        if (l3 == null) {
            l3 = 0L;
        }
        map.put(l, Long.valueOf(JStormUtils.bit_xor(l3, l2)));
    }
}
