/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.execute.spout;

import backtype.storm.task.ICollectorCallback;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImplExt;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.TupleInfo;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.execute.BatchCollector;
import com.alibaba.jstorm.task.execute.MsgInfo;
import com.alibaba.jstorm.task.execute.spout.SpoutCollector;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeOutMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpoutBatchCollector
extends SpoutCollector {
    private static Logger LOG = LoggerFactory.getLogger(SpoutBatchCollector.class);
    protected BatchCollector batchCollector;

    public SpoutBatchCollector(Task task, TimeOutMap<Long, TupleInfo> pending, DisruptorQueue disruptorAckerQueue) {
        super(task, pending, disruptorAckerQueue);
        String componentId = this.topology_context.getThisComponentId();
        this.batchCollector = new BatchCollector(this.task_id, componentId, this.storm_conf){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public List<MsgInfo> push(String streamId, List<Object> tuple, Integer outTaskId, Collection<Tuple> anchors, Object messageId, Long rootId, ICollectorCallback callback) {
                if (outTaskId != null) {
                    Map map = this.directBatches;
                    synchronized (map) {
                        return SpoutBatchCollector.this.addToBatches(outTaskId.toString() + "-" + streamId, this.directBatches, streamId, tuple, outTaskId, messageId, rootId, this.batchSize, callback);
                    }
                }
                Map map = this.streamToBatches;
                synchronized (map) {
                    return SpoutBatchCollector.this.addToBatches(streamId, this.streamToBatches, streamId, tuple, outTaskId, messageId, rootId, this.batchSize, callback);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void flush() {
                Map map = this.streamToBatches;
                synchronized (map) {
                    for (Map.Entry entry : this.streamToBatches.entrySet()) {
                        if (entry.getValue() == null || ((List)entry.getValue()).size() <= 0) continue;
                        SpoutBatchCollector.this.sendBatch((String)entry.getKey(), null, (List)entry.getValue());
                        this.streamToBatches.put(entry.getKey(), null);
                    }
                }
                map = this.directBatches;
                synchronized (map) {
                    for (Map.Entry entry : this.directBatches.entrySet()) {
                        if (entry.getValue() == null || ((List)entry.getValue()).size() <= 0) continue;
                        String[] strings = ((String)entry.getKey()).split("-", 2);
                        SpoutBatchCollector.this.sendBatch(strings[1], strings[0], (List)entry.getValue());
                        this.directBatches.put(entry.getKey(), null);
                    }
                }
            }
        };
    }

    @Override
    protected List<Integer> sendSpoutMsg(String outStreamId, List<Object> values, Object messageId, Integer outTaskId, ICollectorCallback callback) {
        List<Integer> outTasks = null;
        List<MsgInfo> batchTobeFlushed = this.batchCollector.push(outStreamId, values, outTaskId, null, messageId, this.getRootId(messageId), callback);
        if (batchTobeFlushed != null && batchTobeFlushed.size() > 0) {
            outTasks = this.sendBatch(outStreamId, outTaskId != null ? outTaskId.toString() : null, batchTobeFlushed);
        }
        return outTasks;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Integer> sendBatch(String outStreamId, String outTaskId, List<MsgInfo> batchTobeFlushed) {
        long startTime = this.emitTotalTimer.getTime();
        try {
            List<Integer> ret = null;
            Map<List<Integer>, List<MsgInfo>> outTasks = outTaskId != null ? this.sendTargets.getBatch(Integer.valueOf(outTaskId), outStreamId, batchTobeFlushed) : this.sendTargets.getBatch(outStreamId, batchTobeFlushed);
            if (outTasks == null || outTasks.size() == 0) {
                ArrayList<Integer> arrayList = new ArrayList<Integer>();
                return arrayList;
            }
            HashMap<Long, MsgInfo> ackBatch = new HashMap<Long, MsgInfo>();
            for (Map.Entry<List<Integer>, List<MsgInfo>> entry : outTasks.entrySet()) {
                List<Integer> tasks = entry.getKey();
                List<MsgInfo> batch = entry.getValue();
                for (int i = 0; i < tasks.size(); ++i) {
                    Integer t = tasks.get(i);
                    BatchTuple batchTuple = new BatchTuple(t, batch.size());
                    for (MsgInfo msg : batch) {
                        MessageId msgId = this.getMessageId((SpoutMsgInfo)msg, ackBatch);
                        TupleImplExt tp = new TupleImplExt(this.topology_context, msg.values, this.task_id, msg.streamId, msgId);
                        tp.setTargetTaskId(t);
                        batchTuple.addToBatch(tp);
                    }
                    this.transfer_fn.transfer(batchTuple);
                }
                for (MsgInfo msg : batch) {
                    if (msg.callback == null) continue;
                    msg.callback.execute(tasks);
                }
            }
            if (ackBatch.size() > 0) {
                this.sendBatch("__ack_init", null, new ArrayList<MsgInfo>(ackBatch.values()));
            }
            List<Integer> list2 = ret;
            return list2;
        }
        finally {
            this.emitTotalTimer.updateTime(startTime);
        }
    }

    protected MessageId getMessageId(SpoutMsgInfo msg, Map<Long, MsgInfo> ackBatch) {
        MessageId msgId;
        if (msg.rootId != null) {
            Long as = MessageId.generateId(this.random);
            msgId = MessageId.makeRootId(msg.rootId, as);
            MsgInfo msgInfo = ackBatch.get(msg.rootId);
            if (msgInfo == null) {
                TupleInfo info = new TupleInfo();
                info.setStream(msg.streamId);
                info.setValues(msg.values);
                info.setMessageId(msg.messageId);
                info.setTimestamp(System.currentTimeMillis());
                this.pending.putHead(msg.rootId, info);
                List<Object> ackerTuple = JStormUtils.mk_list(msg.rootId, JStormUtils.bit_xor_vals(as), this.task_id);
                msgInfo = new SpoutMsgInfo("__ack_init", ackerTuple, null, null, null, null);
                ackBatch.put(msg.rootId, msgInfo);
            } else {
                List<Object> ackerTuple = msgInfo.values;
                ackerTuple.set(1, JStormUtils.bit_xor_vals(ackerTuple.get(1), as));
            }
        } else {
            msgId = MessageId.makeUnanchored();
        }
        return msgId;
    }

    private List<MsgInfo> addToBatches(String key, Map<String, List<MsgInfo>> batches, String streamId, List<Object> tuple, Integer outTaskId, Object messageId, Long rootId, int batchSize, ICollectorCallback callback) {
        List<MsgInfo> batch = batches.get(key);
        if (batch == null) {
            batch = new ArrayList<MsgInfo>();
            batches.put(key, batch);
        }
        batch.add(new SpoutMsgInfo(streamId, tuple, outTaskId, messageId, rootId, callback));
        if (batch.size() > batchSize) {
            List<MsgInfo> ret = batch;
            batches.put(key, null);
            return ret;
        }
        return null;
    }

    @Override
    public void flush() {
        this.batchCollector.flush();
    }

    @Override
    void unanchoredSend(TopologyContext topologyContext, TaskSendTargets taskTargets, TaskTransfer transfer_fn, String stream, List<Object> values) {
        UnanchoredSend.sendBatch(topologyContext, taskTargets, transfer_fn, stream, values);
    }

    @Override
    void transferCtr(TupleImplExt tupleExt) {
        int taskId = tupleExt.getTargetTaskId();
        BatchTuple batchTuple = new BatchTuple(taskId, 1);
        batchTuple.addToBatch(tupleExt);
        this.transfer_fn.transferControl(batchTuple);
    }

    class SpoutMsgInfo
    extends MsgInfo {
        public Long rootId;
        public Object messageId;

        public SpoutMsgInfo(String streamId, List<Object> values, Integer outTaskId, Object messageId, Long rootId, ICollectorCallback callback) {
            super(streamId, values, outTaskId, callback);
            this.messageId = messageId;
            this.rootId = rootId;
        }
    }
}

