package com.alibaba.jstorm.task.execute;

import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.OutputCollectorCb;
import backtype.storm.topology.IRichBatchBolt;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleExt;
import backtype.storm.tuple.TupleImplExt;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.daemon.worker.timer.TickTupleTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.master.ctrlevent.TopoMasterCtrlEvent;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.Pair;
import com.alibaba.jstorm.utils.RotatingMap;
import com.alibaba.jstorm.utils.TimeUtils;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.com.lmax.disruptor.EventHandler;
import shade.storm.org.apache.commons.cli.HelpFormatter;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/BoltExecutors.class */
public class BoltExecutors extends BaseExecutors implements EventHandler {
    private static Logger LOG = LoggerFactory.getLogger(BoltExecutors.class);
    protected IBolt bolt;
    protected RotatingMap<Tuple, Long> tuple_start_times;
    private int ackerNum;
    private OutputCollector outputCollector;
    private volatile double exeTime;
    private boolean isSystemBolt;

    public BoltExecutors(Task task) {
        super(task);
        this.ackerNum = 0;
        this.bolt = (IBolt) task.getTaskObj();
        this.tuple_start_times = new RotatingMap<>(3);
        this.ackerNum = JStormUtils.parseInt(this.storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS)).intValue();
        BoltCollector boltBatchCollector = ConfigExtension.isTaskBatchTuple(this.storm_conf).booleanValue() ? new BoltBatchCollector(task, this.tuple_start_times, this.message_timeout_secs) : new BoltCollector(task, this.tuple_start_times, this.message_timeout_secs);
        this.outputCollector = new OutputCollector((OutputCollectorCb) boltBatchCollector);
        Integer valueOf = Integer.valueOf(this.sysTopologyCtx.getTopologyMasterId());
        List<Integer> thisWorkerTasks = this.sysTopologyCtx.getThisWorkerTasks();
        if (valueOf.intValue() != 0 && !thisWorkerTasks.contains(valueOf)) {
            while (getConnection(valueOf.intValue()) == null) {
                JStormUtils.sleepMs(10L);
                LOG.info("this task still is building connection with topology Master");
            }
        }
        this.taskHbTrigger.setBoltOutputCollector(boltBatchCollector);
        this.taskHbTrigger.register();
        Object obj = this.storm_conf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_MS);
        if (obj == null) {
            obj = this.storm_conf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
            if (obj != null) {
                obj = Integer.valueOf(JStormUtils.parseInt(obj).intValue() * 1000);
            }
        }
        this.isSystemBolt = Common.isSystemComponent(this.componentId);
        if (obj != null && !this.isSystemBolt) {
            new TickTupleTrigger(this.sysTopologyCtx, JStormUtils.parseInt(obj).intValue(), this.idStr + Constants.SYSTEM_TICK_STREAM_ID, this.controlQueue).register();
        }
        LOG.info("Successfully create BoltExecutors " + this.idStr);
    }

    @Override // com.alibaba.jstorm.task.execute.BaseExecutors
    public void init() {
        this.bolt.prepare(this.storm_conf, this.userTopologyCtx, this.outputCollector);
        this.taskHbTrigger.updateExecutorStatus((byte) 0);
        LOG.info("Succeesfully do Bolt.prepare");
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback
    public String getThreadName() {
        return this.idStr + HelpFormatter.DEFAULT_OPT_PREFIX + BoltExecutors.class.getSimpleName();
    }

    @Override // com.alibaba.jstorm.task.execute.BaseExecutors, com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
    public void run() {
        if (!this.isFinishInit) {
            initWrapper();
        }
        while (!this.taskStatus.isShutdown()) {
            try {
                this.controlQueue.consumeBatch(this);
                this.exeQueue.consumeBatchWhenAvailable(this);
            } catch (Throwable th) {
                if (!this.taskStatus.isShutdown()) {
                    LOG.error(this.idStr + " bolt exeutor  error", th);
                }
            }
        }
    }

    @Override // shade.storm.com.lmax.disruptor.EventHandler
    public void onEvent(Object obj, long j, boolean z) throws Exception {
        if (obj == null) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        try {
            if (obj instanceof Tuple) {
                Tuple tuple = (Tuple) obj;
                int i = 1;
                Long valueOf = Long.valueOf(System.currentTimeMillis());
                this.task_stats.tupleLifeCycle(tuple.getSourceComponent(), tuple.getSourceStreamId(), ((TupleExt) tuple).getCreationTimeStamp(), valueOf.longValue());
                if (((TupleExt) tuple).isBatchTuple()) {
                    List<Object> values = ((Tuple) obj).getValues();
                    i = values.size();
                    if (this.bolt instanceof IRichBatchBolt) {
                        processControlEvent();
                        processTupleBatchEvent(tuple);
                    } else {
                        Iterator<Object> it = values.iterator();
                        while (it.hasNext()) {
                            Pair pair = (Pair) it.next();
                            TupleImplExt tupleImplExt = new TupleImplExt(this.sysTopologyCtx, (List<Object>) pair.getSecond(), (MessageId) pair.getFirst(), (TupleImplExt) obj);
                            processControlEvent();
                            processTupleEvent(tupleImplExt);
                        }
                    }
                } else {
                    processTupleEvent(tuple);
                }
                this.task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), i);
                if (this.ackerNum == 0 && JStormMetrics.enabled) {
                    this.task_stats.update_bolt_acked_latency(tuple.getSourceComponent(), tuple.getSourceStreamId(), valueOf.longValue(), System.currentTimeMillis(), i);
                }
            } else if (obj instanceof TimerTrigger.TimerEvent) {
                processTimerEvent((TimerTrigger.TimerEvent) obj);
            } else {
                LOG.warn("Bolt executor received unknown message");
            }
            if (JStormMetrics.enabled) {
                this.exeTime = System.currentTimeMillis() - currentTimeMillis;
            }
        } catch (Throwable th) {
            if (JStormMetrics.enabled) {
                this.exeTime = System.currentTimeMillis() - currentTimeMillis;
            }
            throw th;
        }
    }

    private void processTupleBatchEvent(Tuple tuple) {
        try {
            if ((this.isSystemBolt || !tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) && !tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_REGISTER_METRICS_RESP_STREAM_ID)) {
                this.bolt.execute(tuple);
            } else if (tuple.getValues().get(0) instanceof Pair) {
                Iterator<Object> it = tuple.getValues().iterator();
                while (it.hasNext()) {
                    Pair pair = (Pair) it.next();
                    processTupleEvent(new TupleImplExt(this.sysTopologyCtx, (List<Object>) pair.getSecond(), (MessageId) pair.getFirst(), (TupleImplExt) tuple));
                }
            }
        } catch (Throwable th) {
            this.error = th;
            LOG.error("bolt execute error ", th);
            this.report_error.report(th);
        }
    }

    private void processTupleEvent(Tuple tuple) {
        if (tuple.getMessageId() != null && tuple.getMessageId().isAnchored()) {
            this.tuple_start_times.put(tuple, Long.valueOf(System.currentTimeMillis()));
        }
        try {
            if (!this.isSystemBolt && tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
                TopoMasterCtrlEvent topoMasterCtrlEvent = (TopoMasterCtrlEvent) tuple.getValue(0);
                if (topoMasterCtrlEvent.isTransactionEvent()) {
                    this.bolt.execute(tuple);
                } else {
                    LOG.warn("Received unexpected control event, {}", topoMasterCtrlEvent);
                }
            } else if (tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_REGISTER_METRICS_RESP_STREAM_ID)) {
                this.metricsReporter.updateMetricMeta((Map) tuple.getValue(0));
            } else {
                this.bolt.execute(tuple);
            }
        } catch (Throwable th) {
            this.error = th;
            LOG.error("bolt execute error ", th);
            this.report_error.report(th);
        }
    }

    private void processTimerEvent(TimerTrigger.TimerEvent timerEvent) {
        switch (timerEvent.getOpCode()) {
            case 1:
                Map<Tuple, Long> rotate = this.tuple_start_times.rotate();
                if (this.ackerNum > 0) {
                    Iterator<Map.Entry<Tuple, Long>> it = rotate.entrySet().iterator();
                    while (it.hasNext()) {
                        Tuple key = it.next().getKey();
                        this.task_stats.bolt_failed_tuple(key.getSourceComponent(), key.getSourceStreamId());
                    }
                    return;
                }
                return;
            case 2:
                try {
                    this.bolt.execute((Tuple) timerEvent.getMsg());
                    return;
                } catch (Throwable th) {
                    this.error = th;
                    LOG.error("bolt execute error ", th);
                    this.report_error.report(th);
                    return;
                }
            case 3:
                this.taskHbTrigger.setExeThreadHbTime(TimeUtils.current_time_secs());
                return;
            default:
                LOG.warn("Receive unsupported timer event, opcode=" + timerEvent.getOpCode());
                return;
        }
    }

    protected void processControlEvent() {
        Object poll = this.controlQueue.poll();
        if (poll != null) {
            if (poll instanceof TimerTrigger.TimerEvent) {
                processTimerEvent((TimerTrigger.TimerEvent) poll);
                LOG.debug("Received one event from control queue");
            } else if (poll instanceof Tuple) {
                processTupleEvent((Tuple) poll);
            } else {
                LOG.warn("Received unknown control event, " + poll.getClass().getName());
            }
        }
    }

    public double getExecuteTime() {
        return this.exeTime;
    }

    @Override // com.alibaba.jstorm.task.execute.BaseExecutors
    public Object getOutputCollector() {
        return this.outputCollector;
    }
}
