/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.acker;

import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import com.alibaba.jstorm.task.acker.AckObject;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Acker
implements IBolt {
    private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
    private static final long serialVersionUID = 4430906880683183091L;
    public static final String ACKER_COMPONENT_ID = "__acker";
    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
    public static final int TIMEOUT_BUCKET_NUM = 3;
    private OutputCollector collector = null;
    private RotatingMap<Object, AckObject> pending = null;
    private long lastRotate = System.currentTimeMillis();
    private long rotateTime;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.pending = new RotatingMap(3, true);
        this.rotateTime = 1000L * (long)JStormUtils.parseInt(stormConf.get("topology.message.timeout.secs"), 30).intValue() / 2L;
    }

    @Override
    public void execute(Tuple input) {
        Object id = input.getValue(0);
        AckObject curr = this.pending.get(id);
        String stream_id = input.getSourceStreamId();
        if (ACKER_INIT_STREAM_ID.equals(stream_id)) {
            if (curr == null) {
                curr = new AckObject();
                curr.val = input.getLong(1);
                curr.spout_task = input.getInteger(2);
                this.pending.put(id, curr);
            } else {
                curr.update_ack(input.getValue(1));
                curr.spout_task = input.getInteger(2);
            }
        } else if (ACKER_ACK_STREAM_ID.equals(stream_id)) {
            if (curr != null) {
                curr.update_ack(input.getValue(1));
            } else {
                curr = new AckObject();
                curr.val = input.getLong(1);
                this.pending.put(id, curr);
            }
        } else if (ACKER_FAIL_STREAM_ID.equals(stream_id)) {
            if (curr == null) {
                return;
            }
            curr.failed = true;
        } else {
            LOG.info("Unknow source stream, " + stream_id + " from task-" + input.getSourceTask());
            return;
        }
        Integer task = curr.spout_task;
        if (task != null) {
            List<Object> values;
            if (curr.val == 0L) {
                this.pending.remove(id);
                values = JStormUtils.mk_list(id);
                this.collector.emitDirect((int)task, ACKER_ACK_STREAM_ID, values);
            } else if (curr.failed) {
                this.pending.remove(id);
                values = JStormUtils.mk_list(id);
                this.collector.emitDirect((int)task, ACKER_FAIL_STREAM_ID, values);
            }
        }
        this.collector.ack(input);
        long now = System.currentTimeMillis();
        if (now - this.lastRotate > this.rotateTime) {
            this.lastRotate = now;
            Map<Object, AckObject> tmp = this.pending.rotate();
            LOG.info("Acker's timeout item size:{}", (Object)tmp.size());
        }
    }

    @Override
    public void cleanup() {
        LOG.info("Successfully cleanup");
    }
}

