/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.execute;

import backtype.storm.task.ICollectorCallback;
import backtype.storm.task.OutputCollectorCb;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleExt;
import backtype.storm.tuple.TupleImplExt;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BoltCollector
extends OutputCollectorCb {
    private static Logger LOG = LoggerFactory.getLogger(BoltCollector.class);
    protected ITaskReportErr reportError;
    protected TaskSendTargets sendTargets;
    protected TaskTransfer taskTransfer;
    protected TopologyContext topologyContext;
    protected Integer task_id;
    protected final RotatingMap<Tuple, Long> tuple_start_times;
    protected TaskBaseMetric task_stats;
    protected final RotatingMap<Tuple, Long> pending_acks;
    protected long lastRotate = System.currentTimeMillis();
    protected long rotateTime;
    protected Map storm_conf;
    protected Integer ackerNum;
    protected AsmHistogram emitTimer;
    protected Random random;

    public BoltCollector(Task task, RotatingMap<Tuple, Long> tuple_start_times, int message_timeout_secs) {
        this.rotateTime = 1000L * (long)message_timeout_secs / 2L;
        this.reportError = task.getReportErrorDie();
        this.sendTargets = task.getTaskSendTargets();
        this.storm_conf = task.getStormConf();
        this.taskTransfer = task.getTaskTransfer();
        this.topologyContext = task.getTopologyContext();
        this.task_id = task.getTaskId();
        this.task_stats = task.getTaskStats();
        this.pending_acks = new RotatingMap(3);
        this.tuple_start_times = tuple_start_times;
        this.ackerNum = JStormUtils.parseInt(this.storm_conf.get("topology.acker.executors"));
        String componentId = this.topologyContext.getThisComponentId();
        this.emitTimer = (AsmHistogram)JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyContext.getTopologyId(), componentId, this.task_id, "EmitTime", MetricType.HISTOGRAM), new AsmHistogram());
        this.emitTimer.setEnabled(false);
        this.random = new Random();
        this.random.setSeed(System.currentTimeMillis());
    }

    @Override
    public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
        return this.sendBoltMsg(streamId, anchors, tuple, null, null);
    }

    @Override
    public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
        this.sendBoltMsg(streamId, anchors, tuple, taskId, null);
    }

    @Override
    public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple, ICollectorCallback callback) {
        return this.sendBoltMsg(streamId, anchors, tuple, null, callback);
    }

    @Override
    public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple, ICollectorCallback callback) {
        this.sendBoltMsg(streamId, anchors, tuple, taskId, callback);
    }

    public List<Integer> emitCtrl(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
        return this.sendCtrlMsg(streamId, tuple, anchors, null);
    }

    public void emitDirectCtrl(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
        this.sendCtrlMsg(streamId, tuple, anchors, taskId);
    }

    protected List<Integer> sendBoltMsg(String outStreamId, Collection<Tuple> anchors, List<Object> values, Integer outTaskId, ICollectorCallback callback) {
        List<Integer> outTasks = null;
        outTasks = this.sendMsg(outStreamId, values, anchors, outTaskId, callback);
        return outTasks;
    }

    protected MessageId getMessageId(Collection<Tuple> anchors) {
        HashMap<Long, Long> anchors_to_ids = new HashMap<Long, Long>();
        if (anchors != null) {
            long now = System.currentTimeMillis();
            if (now - this.lastRotate > this.rotateTime) {
                this.pending_acks.rotate();
                this.lastRotate = now;
            }
            for (Tuple a : anchors) {
                Long edge_id = MessageId.generateId(this.random);
                BoltCollector.put_xor(this.pending_acks, a, edge_id);
                for (Long root_id : a.getMessageId().getAnchorsToIds().keySet()) {
                    BoltCollector.put_xor(anchors_to_ids, root_id, edge_id);
                }
            }
        }
        return MessageId.makeId(anchors_to_ids);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Integer> sendMsg(String out_stream_id, List<Object> values, Collection<Tuple> anchors, Integer out_task_id, ICollectorCallback callback) {
        long start = this.emitTimer.getTime();
        List<Integer> out_tasks = null;
        try {
            out_tasks = out_task_id != null ? this.sendTargets.get(out_task_id, out_stream_id, values, anchors, null) : this.sendTargets.get(out_stream_id, values, anchors, null);
            for (Integer t : out_tasks) {
                MessageId msgid = this.getMessageId(anchors);
                TupleImplExt tp = new TupleImplExt(this.topologyContext, values, this.task_id, out_stream_id, msgid);
                tp.setTargetTaskId(t);
                this.taskTransfer.transfer(tp);
            }
            if (out_tasks == null) {
                out_tasks = new ArrayList();
            }
            if (callback != null) {
                callback.execute(out_tasks);
            }
            this.emitTimer.updateTime(start);
        }
        catch (Exception e) {
            try {
                LOG.error("bolt emit", (Throwable)e);
                if (out_tasks == null) {
                    out_tasks = new ArrayList();
                }
                if (callback != null) {
                    callback.execute(out_tasks);
                }
                this.emitTimer.updateTime(start);
            }
            catch (Throwable throwable) {
                if (out_tasks == null) {
                    out_tasks = new ArrayList<Integer>();
                }
                if (callback != null) {
                    callback.execute(out_tasks);
                }
                this.emitTimer.updateTime(start);
                throw throwable;
            }
        }
        return out_tasks;
    }

    void unanchoredSend(TopologyContext topologyContext, TaskSendTargets taskTargets, TaskTransfer transfer_fn, String stream, List<Object> values) {
        UnanchoredSend.send(topologyContext, taskTargets, transfer_fn, stream, values);
    }

    void transferCtr(TupleImplExt tupleExt) {
        this.taskTransfer.transferControl(tupleExt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected List<Integer> sendCtrlMsg(String out_stream_id, List<Object> values, Collection<Tuple> anchors, Integer out_task_id) {
        long start = this.emitTimer.getTime();
        List<Integer> out_tasks = null;
        try {
            out_tasks = out_task_id != null ? this.sendTargets.get(out_task_id, out_stream_id, values, anchors, null) : this.sendTargets.get(out_stream_id, values, anchors, null);
            for (Integer t : out_tasks) {
                MessageId msgid = this.getMessageId(anchors);
                TupleImplExt tp = new TupleImplExt(this.topologyContext, values, this.task_id, out_stream_id, msgid);
                tp.setTargetTaskId(t);
                this.transferCtr(tp);
            }
        }
        catch (Exception e) {
            LOG.error("bolt emit", (Throwable)e);
        }
        finally {
            this.emitTimer.updateTime(start);
        }
        return out_tasks;
    }

    @Override
    public void ack(Tuple input) {
        Long latencyStart;
        if (this.ackerNum > 0) {
            Long ack_val = 0L;
            Object pend_val = this.pending_acks.remove(input);
            if (pend_val != null) {
                ack_val = (Long)pend_val;
            }
            for (Map.Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) {
                this.unanchoredSend(this.topologyContext, this.sendTargets, this.taskTransfer, "__ack_ack", JStormUtils.mk_list(e.getKey(), JStormUtils.bit_xor(e.getValue(), ack_val)));
            }
        }
        if ((latencyStart = (Long)this.tuple_start_times.remove(input)) != null && JStormMetrics.enabled) {
            long lifeCycleStart = ((TupleExt)input).getCreationTimeStamp();
            long endTime = System.currentTimeMillis();
            this.task_stats.bolt_acked_tuple(input.getSourceComponent(), input.getSourceStreamId(), latencyStart, lifeCycleStart, endTime);
        }
    }

    @Override
    public void fail(Tuple input) {
        if (this.ackerNum > 0) {
            this.pending_acks.remove(input);
            for (Map.Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) {
                this.unanchoredSend(this.topologyContext, this.sendTargets, this.taskTransfer, "__ack_fail", JStormUtils.mk_list(e.getKey()));
            }
        }
        this.task_stats.bolt_failed_tuple(input.getSourceComponent(), input.getSourceStreamId());
    }

    @Override
    public void reportError(Throwable error) {
        this.reportError.report(error);
    }

    public static void put_xor(RotatingMap<Tuple, Long> pending, Tuple key, Long id) {
        Long curr = pending.get(key);
        if (curr == null) {
            curr = 0L;
        }
        pending.put(key, JStormUtils.bit_xor(curr, id));
    }

    public static void put_xor(Map<Long, Long> pending, Long key, Long id) {
        Long curr = pending.get(key);
        if (curr == null) {
            curr = 0L;
        }
        pending.put(key, JStormUtils.bit_xor(curr, id));
    }
}

