package com.alibaba.jstorm.task.execute.spout;

import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.org.apache.commons.cli.HelpFormatter;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.class */
public class SingleThreadSpoutExecutors extends SpoutExecutors {
    private static Logger LOG = LoggerFactory.getLogger(SingleThreadSpoutExecutors.class);

    public SingleThreadSpoutExecutors(Task task) {
        super(task);
    }

    @Override // com.alibaba.jstorm.task.execute.spout.SpoutExecutors
    public void mkPending() {
        if (ConfigExtension.isTaskBatchTuple(this.storm_conf).booleanValue()) {
            this.pending = new RotatingMap<>(3, null, false);
        } else {
            this.pending = new RotatingMap<>(3, null, true);
        }
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback
    public String getThreadName() {
        return this.idStr + HelpFormatter.DEFAULT_OPT_PREFIX + SingleThreadSpoutExecutors.class.getSimpleName();
    }

    @Override // com.alibaba.jstorm.task.execute.spout.SpoutExecutors, com.alibaba.jstorm.task.execute.BaseExecutors, com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
    public void run() {
        if (!this.checkTopologyFinishInit) {
            initWrapper();
            int spoutDelayRunSeconds = ConfigExtension.getSpoutDelayRunSeconds(this.storm_conf);
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                if (this.checkTopologyFinishInit) {
                    break;
                }
                executeEvent();
                this.controlQueue.consumeBatch(this);
                if (System.currentTimeMillis() - currentTimeMillis > spoutDelayRunSeconds * 1000) {
                    this.executorStatus.setStatus((byte) 0);
                    this.checkTopologyFinishInit = true;
                    LOG.info("wait {} timeout, begin operate nextTuple", Integer.valueOf(spoutDelayRunSeconds));
                    break;
                }
            }
            while (true) {
                JStormUtils.sleepMs(10L);
                if (this.taskStatus.isRun()) {
                    this.spout.activate();
                    break;
                } else if (this.taskStatus.isPause()) {
                    this.spout.deactivate();
                    break;
                }
            }
            LOG.info(this.idStr + " is ready, due to the topology finish init. ");
        }
        executeEvent();
        this.controlQueue.consumeBatch(this);
        super.nextTuple();
    }

    private void executeEvent() {
        try {
            this.exeQueue.consumeBatch(this);
        } catch (Exception e) {
            if (this.taskStatus.isShutdown()) {
                return;
            }
            LOG.error("Actor occur unknow exception ", e);
        }
    }
}
