package com.alibaba.jstorm.task.execute;

import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.OutputCollectorCb;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleExt;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.daemon.worker.timer.BackpressureCheckTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TickTupleTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.backpressure.BackpressureTrigger;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import com.alibaba.jstorm.utils.TimeUtils;
import com.lmax.disruptor.EventHandler;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/BoltExecutors.class */
public class BoltExecutors extends BaseExecutors implements EventHandler {
    private static Logger LOG = LoggerFactory.getLogger(BoltExecutors.class);
    protected IBolt bolt;
    protected RotatingMap<Tuple, Long> tuple_start_times;
    private int ackerNum;
    private OutputCollector outputCollector;
    private volatile double exeTime;
    private BackpressureTrigger backpressureTrigger;
    private boolean isSystemBolt;

    public BoltExecutors(Task task) {
        super(task);
        this.ackerNum = 0;
        this.bolt = (IBolt) task.getTaskObj();
        this.tuple_start_times = new RotatingMap<>(3);
        this.ackerNum = JStormUtils.parseInt(this.storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS)).intValue();
        BoltCollector boltBatchCollector = ConfigExtension.isTaskBatchTuple(this.storm_conf).booleanValue() ? new BoltBatchCollector(task, this.tuple_start_times, this.message_timeout_secs) : new BoltCollector(task, this.tuple_start_times, this.message_timeout_secs);
        this.outputCollector = new OutputCollector((OutputCollectorCb) boltBatchCollector);
        this.taskHbTrigger.setBoltOutputCollector(boltBatchCollector);
        Object obj = this.storm_conf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
        this.isSystemBolt = Common.isSystemComponent(this.componentId);
        if (obj != null && !this.isSystemBolt) {
            new TickTupleTrigger(this.sysTopologyCtx, JStormUtils.parseInt(obj).intValue(), this.idStr + Constants.SYSTEM_TICK_STREAM_ID, this.controlQueue).register();
        }
        if (!this.isSystemBolt) {
            this.backpressureTrigger = new BackpressureTrigger(task, this, this.storm_conf, boltBatchCollector);
            new BackpressureCheckTrigger(30, ConfigExtension.getBackpressureCheckIntervl(this.storm_conf), this.idStr + " backpressure check trigger", this.backpressureTrigger).register(TimeUnit.MILLISECONDS);
        }
        LOG.info("Successfully create BoltExecutors " + this.idStr);
    }

    @Override // com.alibaba.jstorm.task.execute.BaseExecutors
    public void init() {
        this.bolt.prepare(this.storm_conf, this.userTopologyCtx, this.outputCollector);
        LOG.info("Succeesfully do Bolt.prepare");
        this.taskHbTrigger.register();
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback
    public String getThreadName() {
        return this.idStr + "-" + BoltExecutors.class.getSimpleName();
    }

    @Override // com.alibaba.jstorm.task.execute.BaseExecutors, com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
    public void run() {
        if (!this.isFinishInit) {
            initWrapper();
        }
        while (!this.taskStatus.isShutdown()) {
            try {
                this.exeQueue.consumeBatchWhenAvailable(this);
                processControlEvent();
            } catch (Throwable th) {
                if (!this.taskStatus.isShutdown()) {
                    LOG.error(this.idStr + " bolt exeutor  error", th);
                }
            }
        }
    }

    public void onEvent(Object obj, long j, boolean z) throws Exception {
        if (obj == null) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        try {
            if (obj instanceof Tuple) {
                processControlEvent();
                processTupleEvent((Tuple) obj);
            } else if (obj instanceof BatchTuple) {
                for (Tuple tuple : ((BatchTuple) obj).getTuples()) {
                    processControlEvent();
                    processTupleEvent(tuple);
                }
            } else if (obj instanceof TimerTrigger.TimerEvent) {
                processTimerEvent((TimerTrigger.TimerEvent) obj);
            } else {
                LOG.warn("Bolt executor received unknown message");
            }
            if (JStormMetrics.enabled) {
                this.exeTime = System.currentTimeMillis() - currentTimeMillis;
            }
        } catch (Throwable th) {
            if (JStormMetrics.enabled) {
                this.exeTime = System.currentTimeMillis() - currentTimeMillis;
            }
            throw th;
        }
    }

    private void processTupleEvent(Tuple tuple) {
        Long l;
        this.task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
        this.tuple_start_times.put(tuple, Long.valueOf(System.currentTimeMillis()));
        try {
            if (this.isSystemBolt || !tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
                this.bolt.execute(tuple);
            } else {
                this.backpressureTrigger.handle(tuple);
            }
        } catch (Throwable th) {
            this.error = th;
            LOG.error("bolt execute error ", th);
            this.report_error.report(th);
        }
        if (this.ackerNum == 0 && (l = (Long) this.tuple_start_times.remove(tuple)) != null && JStormMetrics.enabled) {
            this.task_stats.bolt_acked_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), l.longValue(), ((TupleExt) tuple).getCreationTimeStamp(), System.currentTimeMillis());
        }
    }

    private void processTimerEvent(TimerTrigger.TimerEvent timerEvent) {
        switch (timerEvent.getOpCode()) {
            case 1:
                Map<Tuple, Long> rotate = this.tuple_start_times.rotate();
                if (this.ackerNum > 0) {
                    Iterator<Map.Entry<Tuple, Long>> it = rotate.entrySet().iterator();
                    while (it.hasNext()) {
                        Tuple key = it.next().getKey();
                        this.task_stats.bolt_failed_tuple(key.getSourceComponent(), key.getSourceStreamId());
                    }
                    return;
                }
                return;
            case 2:
                try {
                    this.bolt.execute((Tuple) timerEvent.getMsg());
                    return;
                } catch (Throwable th) {
                    this.error = th;
                    LOG.error("bolt execute error ", th);
                    this.report_error.report(th);
                    return;
                }
            case 3:
                this.taskHbTrigger.setExeThreadHbTime(TimeUtils.current_time_secs());
                return;
            default:
                LOG.warn("Receive unsupported timer event, opcode=" + timerEvent.getOpCode());
                return;
        }
    }

    protected void processControlEvent() {
        Object poll = this.controlQueue.poll();
        if (poll != null) {
            if (poll instanceof TimerTrigger.TimerEvent) {
                processTimerEvent((TimerTrigger.TimerEvent) poll);
                LOG.debug("Received one event from control queue");
            } else if (poll instanceof Tuple) {
                processTupleEvent((Tuple) poll);
            } else {
                if (!(poll instanceof BatchTuple)) {
                    LOG.warn("Received unknown control event, " + poll.getClass().getName());
                    return;
                }
                Iterator<Tuple> it = ((BatchTuple) poll).getTuples().iterator();
                while (it.hasNext()) {
                    processTupleEvent(it.next());
                }
            }
        }
    }

    public double getExecuteTime() {
        return this.exeTime;
    }

    @Override // com.alibaba.jstorm.task.execute.BaseExecutors
    public Object getOutputCollector() {
        return this.outputCollector;
    }
}
