/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.execute.spout;

import backtype.storm.spout.ISpout;
import backtype.storm.spout.SpoutOutputCollectorCb;
import backtype.storm.task.ICollectorCallback;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.TupleImplExt;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.TupleInfo;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.task.execute.spout.AckSpoutMsg;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeOutMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpoutCollector
extends SpoutOutputCollectorCb {
    private static Logger LOG = LoggerFactory.getLogger(SpoutCollector.class);
    protected TaskSendTargets sendTargets;
    protected Map storm_conf;
    protected TaskTransfer transfer_fn;
    protected TimeOutMap<Long, TupleInfo> pending;
    protected TopologyContext topology_context;
    protected DisruptorQueue disruptorAckerQueue;
    protected TaskBaseMetric task_stats;
    protected ISpout spout;
    protected ITaskReportErr report_error;
    protected Integer task_id;
    protected Integer ackerNum;
    protected AsmHistogram emitTotalTimer;
    protected Random random;

    public SpoutCollector(Task task, TimeOutMap<Long, TupleInfo> pending, DisruptorQueue disruptorAckerQueue) {
        this.sendTargets = task.getTaskSendTargets();
        this.storm_conf = task.getStormConf();
        this.transfer_fn = task.getTaskTransfer();
        this.pending = pending;
        this.topology_context = task.getTopologyContext();
        this.disruptorAckerQueue = disruptorAckerQueue;
        this.task_stats = task.getTaskStats();
        this.spout = (ISpout)task.getTaskObj();
        this.task_id = task.getTaskId();
        this.report_error = task.getReportErrorDie();
        this.ackerNum = JStormUtils.parseInt(this.storm_conf.get("topology.acker.executors"));
        this.random = new Random();
        this.random.setSeed(System.currentTimeMillis());
        String componentId = this.topology_context.getThisComponentId();
        this.emitTotalTimer = (AsmHistogram)JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topology_context.getTopologyId(), componentId, this.task_id, "EmitTime", MetricType.HISTOGRAM), new AsmHistogram());
        this.emitTotalTimer.setEnabled(false);
    }

    @Override
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        return this.sendSpoutMsg(streamId, tuple, messageId, null, null);
    }

    @Override
    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
        this.sendSpoutMsg(streamId, tuple, messageId, taskId, null);
    }

    @Override
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId, ICollectorCallback callback) {
        return this.sendSpoutMsg(streamId, tuple, messageId, null, callback);
    }

    @Override
    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId, ICollectorCallback callback) {
        this.sendSpoutMsg(streamId, tuple, messageId, taskId, callback);
    }

    public void emitDirectCtrl(int taskId, String streamId, List<Object> tuple, Object messageId) {
        this.sendCtrlMsg(streamId, tuple, messageId, taskId);
    }

    public List<Integer> emitCtrl(String streamId, List<Object> tuple, Object messageId) {
        return this.sendCtrlMsg(streamId, tuple, messageId, null);
    }

    protected List<Integer> sendSpoutMsg(String outStreamId, List<Object> values, Object messageId, Integer outTaskId, ICollectorCallback callback) {
        List<Integer> outTasks = null;
        outTasks = this.sendMsg(outStreamId, values, messageId, outTaskId, callback);
        return outTasks;
    }

    void unanchoredSend(TopologyContext topologyContext, TaskSendTargets taskTargets, TaskTransfer transfer_fn, String stream, List<Object> values) {
        UnanchoredSend.send(topologyContext, taskTargets, transfer_fn, stream, values);
    }

    void transferCtr(TupleImplExt tupleExt) {
        this.transfer_fn.transferControl(tupleExt);
    }

    protected void sendMsgToAck(String out_stream_id, List<Object> values, Object message_id, Long root_id, List<Long> ackSeq, boolean needAck) {
        if (needAck) {
            TupleInfo info = new TupleInfo();
            info.setStream(out_stream_id);
            info.setValues(values);
            info.setMessageId(message_id);
            info.setTimestamp(System.currentTimeMillis());
            this.pending.putHead(root_id, info);
            List<Object> ackerTuple = JStormUtils.mk_list(root_id, JStormUtils.bit_xor_vals(ackSeq), this.task_id);
            this.unanchoredSend(this.topology_context, this.sendTargets, this.transfer_fn, "__ack_init", ackerTuple);
        } else if (message_id != null) {
            TupleInfo info = new TupleInfo();
            info.setStream(out_stream_id);
            info.setValues(values);
            info.setMessageId(message_id);
            info.setTimestamp(0L);
            AckSpoutMsg ack = new AckSpoutMsg(root_id, this.spout, null, info, this.task_stats);
            ack.run();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Integer> sendMsg(String out_stream_id, List<Object> values, Object message_id, Integer out_task_id, ICollectorCallback callback) {
        long startTime = this.emitTotalTimer.getTime();
        try {
            boolean needAck = message_id != null && this.ackerNum > 0;
            Long root_id = this.getRootId(message_id);
            List<Integer> out_tasks = null;
            out_tasks = out_task_id != null ? this.sendTargets.get(out_task_id, out_stream_id, values, null, root_id) : this.sendTargets.get(out_stream_id, values, null, root_id);
            if (out_tasks.size() == 0) {
                List<Integer> list2 = out_tasks;
                return list2;
            }
            ArrayList<Long> ackSeq = null;
            for (Integer t : out_tasks) {
                MessageId msgid;
                if (needAck) {
                    ackSeq = new ArrayList<Long>();
                    Long as = MessageId.generateId(this.random);
                    msgid = MessageId.makeRootId(root_id, as);
                    ackSeq.add(as);
                } else {
                    msgid = MessageId.makeUnanchored();
                }
                TupleImplExt tp = new TupleImplExt(this.topology_context, values, this.task_id, out_stream_id, msgid);
                tp.setTargetTaskId(t);
                this.transfer_fn.transfer(tp);
            }
            this.sendMsgToAck(out_stream_id, values, message_id, root_id, ackSeq, needAck);
            if (callback != null) {
                callback.execute(out_tasks);
            }
            List<Integer> list3 = out_tasks;
            return list3;
        }
        finally {
            this.emitTotalTimer.updateTime(startTime);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected List<Integer> sendCtrlMsg(String out_stream_id, List<Object> values, Object message_id, Integer out_task_id) {
        long startTime = this.emitTotalTimer.getTime();
        try {
            boolean needAck = message_id != null && this.ackerNum > 0;
            Long root_id = this.getRootId(message_id);
            List<Integer> out_tasks = null;
            out_tasks = out_task_id != null ? this.sendTargets.get(out_task_id, out_stream_id, values, null, root_id) : this.sendTargets.get(out_stream_id, values, null, root_id);
            if (out_tasks.size() == 0) {
                List<Integer> list2 = out_tasks;
                return list2;
            }
            ArrayList<Long> ackSeq = null;
            for (Integer t : out_tasks) {
                MessageId msgid;
                if (needAck) {
                    ackSeq = new ArrayList<Long>();
                    Long as = MessageId.generateId(this.random);
                    msgid = MessageId.makeRootId(root_id, as);
                    ackSeq.add(as);
                } else {
                    msgid = MessageId.makeUnanchored();
                }
                TupleImplExt tp = new TupleImplExt(this.topology_context, values, this.task_id, out_stream_id, msgid);
                tp.setTargetTaskId(t);
                this.transferCtr(tp);
            }
            this.sendMsgToAck(out_stream_id, values, message_id, root_id, ackSeq, needAck);
            List<Integer> list3 = out_tasks;
            return list3;
        }
        finally {
            this.emitTotalTimer.updateTime(startTime);
        }
    }

    @Override
    public void reportError(Throwable error) {
        this.report_error.report(error);
    }

    protected Long getRootId(Object messageId) {
        Boolean needAck = messageId != null && this.ackerNum > 0;
        Long rootId = null;
        if (needAck.booleanValue()) {
            rootId = MessageId.generateId(this.random);
            while (this.pending.containsKey(rootId)) {
                rootId = MessageId.generateId(this.random);
            }
        }
        return rootId;
    }
}

