package com.alibaba.jstorm.task.acker;

import backtype.storm.Config;
import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/acker/Acker.class */
public class Acker implements IBolt {
    private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
    private static final long serialVersionUID = 4430906880683183091L;
    public static final String ACKER_COMPONENT_ID = "__acker";
    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
    public static final int TIMEOUT_BUCKET_NUM = 3;
    private OutputCollector collector = null;
    private RotatingMap<Object, AckObject> pending = null;
    private long lastRotate = System.currentTimeMillis();
    private long rotateTime;

    @Override // backtype.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.pending = new RotatingMap<>(3, true);
        this.rotateTime = (1000 * JStormUtils.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30).intValue()) / 2;
    }

    @Override // backtype.storm.task.IBolt
    public void execute(Tuple tuple) {
        Object value = tuple.getValue(0);
        AckObject ackObject = this.pending.get(value);
        String sourceStreamId = tuple.getSourceStreamId();
        if ("__ack_init".equals(sourceStreamId)) {
            if (ackObject == null) {
                ackObject = new AckObject();
                ackObject.val = tuple.getLong(1).longValue();
                ackObject.spout_task = tuple.getInteger(2);
                this.pending.put(value, ackObject);
            } else {
                ackObject.update_ack(tuple.getValue(1));
                ackObject.spout_task = tuple.getInteger(2);
            }
        } else if ("__ack_ack".equals(sourceStreamId)) {
            if (ackObject != null) {
                ackObject.update_ack(tuple.getValue(1));
            } else {
                ackObject = new AckObject();
                ackObject.val = tuple.getLong(1).longValue();
                this.pending.put(value, ackObject);
            }
        } else if (!"__ack_fail".equals(sourceStreamId)) {
            LOG.info("Unknow source stream, " + sourceStreamId + " from task-" + tuple.getSourceTask());
            return;
        } else if (ackObject == null) {
            return;
        } else {
            ackObject.failed = true;
        }
        Integer num = ackObject.spout_task;
        if (num != null) {
            if (ackObject.val == 0) {
                this.pending.remove(value);
                this.collector.emitDirect(num.intValue(), "__ack_ack", JStormUtils.mk_list(value));
            } else if (ackObject.failed) {
                this.pending.remove(value);
                this.collector.emitDirect(num.intValue(), "__ack_fail", JStormUtils.mk_list(value));
            }
        }
        this.collector.ack(tuple);
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.lastRotate > this.rotateTime) {
            this.lastRotate = currentTimeMillis;
            LOG.info("Acker's timeout item size:{}", Integer.valueOf(this.pending.rotate().size()));
        }
    }

    @Override // backtype.storm.task.IBolt
    public void cleanup() {
        LOG.info("Successfully cleanup");
    }
}
