/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.execute;

import backtype.storm.task.ICollectorCallback;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleExt;
import backtype.storm.tuple.TupleImplExt;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.execute.BatchCollector;
import com.alibaba.jstorm.task.execute.BoltCollector;
import com.alibaba.jstorm.task.execute.MsgInfo;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BoltBatchCollector
extends BoltCollector {
    private static Logger LOG = LoggerFactory.getLogger(BoltBatchCollector.class);
    private BatchCollector batchCollector;
    private final RotatingMap<Tuple, Integer> pendingTuples = new RotatingMap(3, true);
    private final BlockingQueue<Tuple> pendingAcks = new LinkedBlockingQueue<Tuple>();

    public BoltBatchCollector(Task task, RotatingMap<Tuple, Long> tuple_start_times, int message_timeout_secs) {
        super(task, tuple_start_times, message_timeout_secs);
        String componentId = this.topologyContext.getThisComponentId();
        this.batchCollector = new BatchCollector(this.task_id, componentId, this.storm_conf){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public List<MsgInfo> push(String streamId, List<Object> tuple, Integer outTaskId, Collection<Tuple> anchors, Object messageId, Long rootId, ICollectorCallback callback) {
                Object object;
                if (anchors != null) {
                    object = BoltBatchCollector.this.pendingTuples;
                    synchronized (object) {
                        for (Tuple a : anchors) {
                            Integer pendingCount = (Integer)BoltBatchCollector.this.pendingTuples.get(a);
                            if (pendingCount == null) {
                                pendingCount = 0;
                            }
                            pendingCount = pendingCount + 1;
                            BoltBatchCollector.this.pendingTuples.put(a, pendingCount);
                        }
                    }
                }
                if (outTaskId != null) {
                    object = this.directBatches;
                    synchronized (object) {
                        return BoltBatchCollector.this.addToBatches(outTaskId.toString() + "-" + streamId, this.directBatches, streamId, tuple, outTaskId, anchors, this.batchSize, callback);
                    }
                }
                object = this.streamToBatches;
                synchronized (object) {
                    return BoltBatchCollector.this.addToBatches(streamId, this.streamToBatches, streamId, tuple, outTaskId, anchors, this.batchSize, callback);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void flush() {
                ArrayList<List<Object>> batches = new ArrayList<List<Object>>();
                Map map = this.streamToBatches;
                synchronized (map) {
                    for (Map.Entry entry : this.streamToBatches.entrySet()) {
                        if (entry.getValue() == null || ((List)entry.getValue()).size() <= 0) continue;
                        batches.add(JStormUtils.mk_list(entry.getKey(), null, entry.getValue()));
                        this.streamToBatches.put(entry.getKey(), null);
                    }
                }
                map = this.directBatches;
                synchronized (map) {
                    for (Map.Entry entry : this.directBatches.entrySet()) {
                        if (entry.getValue() == null || ((List)entry.getValue()).size() <= 0) continue;
                        String[] strings = ((String)entry.getKey()).split("-", 2);
                        batches.add(JStormUtils.mk_list(strings[1], strings[0], entry.getValue()));
                        this.directBatches.put(entry.getKey(), null);
                    }
                }
                for (List list2 : batches) {
                    BoltBatchCollector.this.sendBatch((String)list2.get(0), (String)list2.get(1), (List)list2.get(2));
                }
                int size = BoltBatchCollector.this.pendingAcks.size();
                while (--size >= 0) {
                    Tuple tuple = (Tuple)BoltBatchCollector.this.pendingAcks.poll();
                    if (tuple == null || BoltBatchCollector.this.sendAckTuple(tuple)) continue;
                    try {
                        BoltBatchCollector.this.pendingAcks.put(tuple);
                    }
                    catch (InterruptedException e) {
                        LOG.warn("Failed to put ackTuple, tuple=" + tuple, (Throwable)e);
                    }
                }
            }
        };
    }

    @Override
    protected List<Integer> sendBoltMsg(String outStreamId, Collection<Tuple> anchors, List<Object> values, Integer outTaskId, ICollectorCallback callback) {
        List<Integer> outTasks = new ArrayList<Integer>();
        List<MsgInfo> batchTobeFlushed = this.batchCollector.push(outStreamId, values, outTaskId, anchors, null, null, callback);
        if (batchTobeFlushed != null && batchTobeFlushed.size() > 0) {
            outTasks = this.sendBatch(outStreamId, outTaskId != null ? outTaskId.toString() : null, batchTobeFlushed);
        }
        return outTasks;
    }

    private List<MsgInfo> addToBatches(String key, Map<String, List<MsgInfo>> batches, String streamId, List<Object> tuple, Integer outTaskId, Collection<Tuple> anchors, int batchSize, ICollectorCallback callback) {
        List<MsgInfo> batch = batches.get(key);
        if (batch == null) {
            batch = new ArrayList<MsgInfo>();
            batches.put(key, batch);
        }
        batch.add(new BoltMsgInfo(streamId, tuple, anchors, outTaskId, callback));
        if (batch.size() > batchSize) {
            batches.put(key, null);
            return batch;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Integer> sendBatch(String outStreamId, String outTaskId, List<MsgInfo> batchTobeFlushed) {
        long start = this.emitTimer.getTime();
        try {
            ArrayList<Integer> ret = new ArrayList<Integer>();
            Map<List<Integer>, List<MsgInfo>> outTasks = outTaskId != null ? this.sendTargets.getBatch(Integer.valueOf(outTaskId), outStreamId, batchTobeFlushed) : this.sendTargets.getBatch(outStreamId, batchTobeFlushed);
            if (outTasks != null && outTasks.size() != 0) {
                for (Map.Entry entry : outTasks.entrySet()) {
                    List tasks = (List)entry.getKey();
                    List batch = (List)entry.getValue();
                    for (Integer t : tasks) {
                        BatchTuple batchTuple = new BatchTuple(t, batch.size());
                        for (MsgInfo msg : batch) {
                            Collection<Tuple> as = ((BoltMsgInfo)msg).anchors;
                            MessageId msgId = this.getMessageId(as);
                            TupleImplExt tp = new TupleImplExt(this.topologyContext, msg.values, this.task_id, msg.streamId, msgId);
                            tp.setTargetTaskId(t);
                            batchTuple.addToBatch(tp);
                        }
                        this.taskTransfer.transfer(batchTuple);
                    }
                    for (MsgInfo msg : batch) {
                        if (msg.callback == null) continue;
                        msg.callback.execute(tasks);
                    }
                }
            }
            for (MsgInfo msgInfo : batchTobeFlushed) {
                Collection<Tuple> anchors = ((BoltMsgInfo)msgInfo).anchors;
                if (anchors == null || anchors.size() <= 0) continue;
                for (Tuple a : anchors) {
                    RotatingMap<Tuple, Integer> rotatingMap = this.pendingTuples;
                    synchronized (rotatingMap) {
                        Integer pendingCount = this.pendingTuples.get(a);
                        if (pendingCount != null) {
                            if ((pendingCount = Integer.valueOf(pendingCount - 1)) <= 0) {
                                this.pendingTuples.remove(a);
                            } else {
                                this.pendingTuples.put(a, pendingCount);
                            }
                        }
                    }
                }
            }
            ArrayList<Integer> arrayList = ret;
            return arrayList;
        }
        catch (Exception e) {
            LOG.error("bolt emit", (Throwable)e);
        }
        finally {
            this.emitTimer.updateTime(start);
        }
        return new ArrayList<Integer>();
    }

    @Override
    public void ack(Tuple input) {
        Long latencyStart;
        if (this.ackerNum > 0 && !this.sendAckTuple(input)) {
            this.pendingAcks.add(input);
        }
        if ((latencyStart = (Long)this.tuple_start_times.remove(input)) != null && JStormMetrics.enabled) {
            long endTime = System.currentTimeMillis();
            long lifeCycleStart = ((TupleExt)input).getCreationTimeStamp();
            this.task_stats.bolt_acked_tuple(input.getSourceComponent(), input.getSourceStreamId(), latencyStart, lifeCycleStart, endTime);
        }
    }

    @Override
    public void fail(Tuple input) {
        if (this.ackerNum > 0) {
            this.pending_acks.remove(input);
            for (Map.Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) {
                List<Object> ackTuple = JStormUtils.mk_list(e.getKey());
                this.sendBoltMsg("__ack_fail", null, ackTuple, null, null);
            }
        }
        this.task_stats.bolt_failed_tuple(input.getSourceComponent(), input.getSourceStreamId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected MessageId getMessageId(Collection<Tuple> anchors) {
        HashMap<Long, Long> anchors_to_ids = new HashMap<Long, Long>();
        if (anchors != null) {
            long now = System.currentTimeMillis();
            if (now - this.lastRotate > this.rotateTime) {
                this.pending_acks.rotate();
                RotatingMap<Tuple, Integer> rotatingMap = this.pendingTuples;
                synchronized (rotatingMap) {
                    this.pendingTuples.rotate();
                }
                this.lastRotate = now;
            }
            for (Tuple a : anchors) {
                Long edge_id = MessageId.generateId(this.random);
                RotatingMap rotatingMap = this.pending_acks;
                synchronized (rotatingMap) {
                    BoltBatchCollector.put_xor(this.pending_acks, a, edge_id);
                }
                for (Long root_id : a.getMessageId().getAnchorsToIds().keySet()) {
                    BoltBatchCollector.put_xor(anchors_to_ids, root_id, edge_id);
                }
            }
        }
        return MessageId.makeId(anchors_to_ids);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean sendAckTuple(Tuple input) {
        Integer pendingCount;
        boolean ret = false;
        RotatingMap<Tuple, Integer> rotatingMap = this.pendingTuples;
        synchronized (rotatingMap) {
            pendingCount = this.pendingTuples.get(input);
        }
        if (pendingCount == null || pendingCount <= 0) {
            long ack_val = 0L;
            Object pend_val = this.pending_acks.remove(input);
            if (pend_val != null) {
                ack_val = (Long)pend_val;
            }
            for (Map.Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) {
                List<Object> ackTuple = JStormUtils.mk_list(e.getKey(), JStormUtils.bit_xor(e.getValue(), ack_val));
                this.sendBoltMsg("__ack_ack", null, ackTuple, null, null);
            }
            ret = true;
        }
        return ret;
    }

    @Override
    public void flush() {
        this.batchCollector.flush();
    }

    @Override
    void transferCtr(TupleImplExt tupleExt) {
        int taskId = tupleExt.getTargetTaskId();
        BatchTuple batchTuple = new BatchTuple(taskId, 1);
        batchTuple.addToBatch(tupleExt);
        this.taskTransfer.transferControl(batchTuple);
    }

    @Override
    void unanchoredSend(TopologyContext topologyContext, TaskSendTargets taskTargets, TaskTransfer transfer_fn, String stream, List<Object> values) {
        UnanchoredSend.sendBatch(topologyContext, taskTargets, transfer_fn, stream, values);
    }

    private class BoltMsgInfo
    extends MsgInfo {
        public Collection<Tuple> anchors;

        public BoltMsgInfo(String streamId, List<Object> values, Collection<Tuple> anchors, Integer outTaskId, ICollectorCallback callback) {
            super(streamId, values, outTaskId, callback);
            this.anchors = anchors;
        }
    }
}

