package com.alibaba.jstorm.task.execute.spout;

import backtype.storm.Config;
import backtype.storm.spout.ISpout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.spout.SpoutOutputCollectorCb;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.Tuple;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.TimerRatio;
import com.alibaba.jstorm.daemon.worker.JStormDebugger;
import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.comm.TupleInfo;
import com.alibaba.jstorm.task.execute.BaseExecutors;
import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
import com.alibaba.jstorm.task.master.TopologyMaster;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import com.alibaba.jstorm.utils.TimeUtils;
import com.codahale.metrics.Gauge;
import com.lmax.disruptor.EventHandler;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/spout/SpoutExecutors.class */
public class SpoutExecutors extends BaseExecutors implements EventHandler {
    private static Logger LOG = LoggerFactory.getLogger(SpoutExecutors.class);
    protected final Integer max_spout_pending;
    protected ISpout spout;
    protected RotatingMap<Long, TupleInfo> pending;
    protected SpoutOutputCollector outputCollector;
    protected AsmHistogram nextTupleTimer;
    protected TimerRatio emptyCpuGauge;
    private String topologyId;
    private String componentId;
    private int taskId;
    protected AsyncLoopThread ackerRunnableThread;
    protected boolean isSpoutFullSleep;

    public SpoutExecutors(Task task) {
        super(task);
        this.spout = (ISpout) task.getTaskObj();
        this.max_spout_pending = JStormUtils.parseInt(this.storm_conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
        this.topologyId = this.sysTopologyCtx.getTopologyId();
        this.componentId = this.sysTopologyCtx.getThisComponentId();
        this.taskId = task.getTaskId().intValue();
        this.nextTupleTimer = (AsmHistogram) JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyId, this.componentId, this.taskId, MetricDef.EXECUTE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
        this.emptyCpuGauge = new TimerRatio();
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyId, this.componentId, this.taskId, MetricDef.EMPTY_CPU_RATIO, MetricType.GAUGE), new AsmGauge(this.emptyCpuGauge));
        this.isSpoutFullSleep = ConfigExtension.isSpoutPendFullSleep(this.storm_conf);
        LOG.info("isSpoutFullSleep:" + this.isSpoutFullSleep);
        mkPending();
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyId, this.componentId, this.taskId, MetricDef.PENDING_MAP, MetricType.GAUGE), new AsmGauge(new Gauge<Double>() { // from class: com.alibaba.jstorm.task.execute.spout.SpoutExecutors.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Double m1220getValue() {
                return Double.valueOf(SpoutExecutors.this.pending.size());
            }
        }));
        SpoutCollector spoutBatchCollector = ConfigExtension.isTaskBatchTuple(this.storm_conf).booleanValue() ? new SpoutBatchCollector(task, this.pending, this.exeQueue) : new SpoutCollector(task, this.pending, this.exeQueue);
        this.outputCollector = new SpoutOutputCollector((SpoutOutputCollectorCb) spoutBatchCollector);
        this.taskTransfer.getBackpressureController().setSpoutCollector(spoutBatchCollector);
        this.taskHbTrigger.setSpoutOutputCollector(spoutBatchCollector);
        LOG.info("Successfully create SpoutExecutors " + this.idStr);
    }

    public void mkPending() {
        throw new RuntimeException("Should override this function");
    }

    @Override // com.alibaba.jstorm.task.execute.BaseExecutors
    public void init() throws Exception {
        this.spout.open(this.storm_conf, this.userTopologyCtx, this.outputCollector);
        LOG.info("Successfully open SpoutExecutors " + this.idStr);
        this.taskHbTrigger.register();
        JStormUtils.sleepMs(ConfigExtension.getSpoutDelayRunSeconds(this.storm_conf) * ConfigExtension.DEFAULT_ZMQ_MAX_QUEUE_MSG);
        if (this.taskStatus.isRun()) {
            this.spout.activate();
        } else {
            this.spout.deactivate();
        }
        LOG.info(this.idStr + " is ready ");
    }

    public void nextTuple() {
        if (!this.taskStatus.isRun()) {
            JStormUtils.sleepMs(1L);
            return;
        }
        if (this.max_spout_pending != null && this.pending.size() >= this.max_spout_pending.intValue()) {
            if (this.isSpoutFullSleep) {
                JStormUtils.sleepMs(1L);
            }
            this.emptyCpuGauge.start();
            return;
        }
        this.emptyCpuGauge.stop();
        long time = this.nextTupleTimer.getTime();
        try {
            try {
                this.spout.nextTuple();
                this.nextTupleTimer.updateTime(time);
            } catch (Throwable th) {
                this.error = th;
                LOG.error("spout execute error ", th);
                this.report_error.report(th);
                this.nextTupleTimer.updateTime(time);
            }
        } catch (Throwable th2) {
            this.nextTupleTimer.updateTime(time);
            throw th2;
        }
    }

    @Override // com.alibaba.jstorm.task.execute.BaseExecutors, com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
    public void run() {
        throw new RuntimeException("Should implement this function");
    }

    public void onEvent(Object obj, long j, boolean z) throws Exception {
        if (obj == null) {
            return;
        }
        try {
            Runnable runnable = null;
            if (obj instanceof Tuple) {
                processControlEvent();
                runnable = processTupleEvent((Tuple) obj);
            } else if (obj instanceof BatchTuple) {
                for (Tuple tuple : ((BatchTuple) obj).getTuples()) {
                    processControlEvent();
                    runnable = processTupleEvent(tuple);
                    if (runnable != null) {
                        runnable.run();
                        runnable = null;
                    }
                }
            } else if (obj instanceof TimerTrigger.TimerEvent) {
                processTimerEvent((TimerTrigger.TimerEvent) obj);
                return;
            } else if (obj instanceof IAckMsg) {
                runnable = (Runnable) obj;
            } else {
                if (!(obj instanceof Runnable)) {
                    LOG.warn("Receive one unknow event-" + obj.toString() + " " + this.idStr);
                    return;
                }
                runnable = (Runnable) obj;
            }
            if (runnable != null) {
                runnable.run();
            }
        } catch (Throwable th) {
            if (this.taskStatus.isShutdown()) {
                return;
            }
            LOG.info("Unknow excpetion ", th);
            this.report_error.report(th);
        }
    }

    private Runnable processTupleEvent(Tuple tuple) {
        IAckMsg iAckMsg = null;
        if (tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
            this.taskTransfer.getBackpressureController().control((TopoMasterCtrlEvent) tuple.getValueByField(TopologyMaster.FILED_CTRL_EVENT));
        } else {
            Object value = tuple.getValue(0);
            Object remove = this.pending.remove((Long) value);
            if (remove == null) {
                if (JStormDebugger.isDebug(value)) {
                    LOG.info("Pending map no entry:" + value);
                }
                iAckMsg = null;
            } else {
                TupleInfo tupleInfo = (TupleInfo) remove;
                String sourceStreamId = tuple.getSourceStreamId();
                if (sourceStreamId.equals("__ack_ack")) {
                    iAckMsg = new AckSpoutMsg(value, this.spout, tuple, tupleInfo, this.task_stats);
                } else if (sourceStreamId.equals("__ack_fail")) {
                    iAckMsg = new FailSpoutMsg(value, this.spout, tupleInfo, this.task_stats);
                } else {
                    LOG.warn("Receive one unknow source Tuple " + this.idStr);
                    iAckMsg = null;
                }
            }
            this.task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
        }
        return iAckMsg;
    }

    public AsyncLoopThread getAckerRunnableThread() {
        return this.ackerRunnableThread;
    }

    private void processTimerEvent(TimerTrigger.TimerEvent timerEvent) {
        switch (timerEvent.getOpCode()) {
            case 1:
                for (Map.Entry<Long, TupleInfo> entry : this.pending.rotate().entrySet()) {
                    new FailSpoutMsg(entry.getKey(), this.spout, entry.getValue(), this.task_stats).run();
                }
                return;
            case 3:
                this.taskHbTrigger.setExeThreadHbTime(TimeUtils.current_time_secs());
                return;
            default:
                LOG.warn("Receive unsupported timer event, opcode=" + timerEvent.getOpCode());
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processControlEvent() {
        Object poll = this.controlQueue.poll();
        if (poll != null) {
            if (poll instanceof TimerTrigger.TimerEvent) {
                processTimerEvent((TimerTrigger.TimerEvent) poll);
                return;
            }
            if (poll instanceof Tuple) {
                processTupleEvent((Tuple) poll);
            } else {
                if (!(poll instanceof BatchTuple)) {
                    LOG.warn("Received unknown control event, " + poll.getClass().getName());
                    return;
                }
                Iterator<Tuple> it = ((BatchTuple) poll).getTuples().iterator();
                while (it.hasNext()) {
                    processTupleEvent(it.next());
                }
            }
        }
    }

    @Override // com.alibaba.jstorm.task.execute.BaseExecutors
    public Object getOutputCollector() {
        return this.outputCollector;
    }
}
