/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.execute.spout;

import backtype.storm.spout.ISpout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.Tuple;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.TimerRatio;
import com.alibaba.jstorm.daemon.worker.JStormDebugger;
import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.comm.TupleInfo;
import com.alibaba.jstorm.task.execute.BaseExecutors;
import com.alibaba.jstorm.task.execute.spout.AckSpoutMsg;
import com.alibaba.jstorm.task.execute.spout.FailSpoutMsg;
import com.alibaba.jstorm.task.execute.spout.IAckMsg;
import com.alibaba.jstorm.task.execute.spout.SpoutBatchCollector;
import com.alibaba.jstorm.task.execute.spout.SpoutCollector;
import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import com.alibaba.jstorm.utils.TimeUtils;
import com.codahale.metrics.Gauge;
import com.lmax.disruptor.EventHandler;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpoutExecutors
extends BaseExecutors
implements EventHandler {
    private static Logger LOG = LoggerFactory.getLogger(SpoutExecutors.class);
    protected final Integer max_spout_pending;
    protected ISpout spout;
    protected RotatingMap<Long, TupleInfo> pending;
    protected SpoutOutputCollector outputCollector;
    protected AsmHistogram nextTupleTimer;
    protected TimerRatio emptyCpuGauge;
    private String topologyId;
    private String componentId;
    private int taskId;
    protected AsyncLoopThread ackerRunnableThread;
    protected boolean isSpoutFullSleep;

    public SpoutExecutors(Task task) {
        super(task);
        this.spout = (ISpout)task.getTaskObj();
        this.max_spout_pending = JStormUtils.parseInt(this.storm_conf.get("topology.max.spout.pending"));
        this.topologyId = this.sysTopologyCtx.getTopologyId();
        this.componentId = this.sysTopologyCtx.getThisComponentId();
        this.taskId = task.getTaskId();
        this.nextTupleTimer = (AsmHistogram)JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyId, this.componentId, this.taskId, "ExecutorTime", MetricType.HISTOGRAM), new AsmHistogram());
        this.emptyCpuGauge = new TimerRatio();
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyId, this.componentId, this.taskId, "EmptyCpuRatio", MetricType.GAUGE), new AsmGauge(this.emptyCpuGauge));
        this.isSpoutFullSleep = ConfigExtension.isSpoutPendFullSleep(this.storm_conf);
        LOG.info("isSpoutFullSleep:" + this.isSpoutFullSleep);
        this.mkPending();
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyId, this.componentId, this.taskId, "PendingNum", MetricType.GAUGE), new AsmGauge(new Gauge<Double>(){

            public Double getValue() {
                return SpoutExecutors.this.pending.size();
            }
        }));
        SpoutCollector collector = null;
        collector = ConfigExtension.isTaskBatchTuple(this.storm_conf) != false ? new SpoutBatchCollector(task, this.pending, this.exeQueue) : new SpoutCollector(task, this.pending, this.exeQueue);
        this.outputCollector = new SpoutOutputCollector(collector);
        this.taskTransfer.getBackpressureController().setSpoutCollector(collector);
        this.taskHbTrigger.setSpoutOutputCollector(collector);
        LOG.info("Successfully create SpoutExecutors " + this.idStr);
    }

    public void mkPending() {
        throw new RuntimeException("Should override this function");
    }

    @Override
    public void init() throws Exception {
        this.spout.open(this.storm_conf, this.userTopologyCtx, this.outputCollector);
        LOG.info("Successfully open SpoutExecutors " + this.idStr);
        this.taskHbTrigger.register();
        int delayRun = ConfigExtension.getSpoutDelayRunSeconds(this.storm_conf);
        JStormUtils.sleepMs(delayRun * 1000);
        if (this.taskStatus.isRun()) {
            this.spout.activate();
        } else {
            this.spout.deactivate();
        }
        LOG.info(this.idStr + " is ready ");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void nextTuple() {
        if (!this.taskStatus.isRun()) {
            JStormUtils.sleepMs(1L);
            return;
        }
        if (this.max_spout_pending == null || this.pending.size() < this.max_spout_pending) {
            this.emptyCpuGauge.stop();
            long start = this.nextTupleTimer.getTime();
            try {
                this.spout.nextTuple();
            }
            catch (Throwable e) {
                this.error = e;
                LOG.error("spout execute error ", e);
                this.report_error.report(e);
            }
            finally {
                this.nextTupleTimer.updateTime(start);
            }
        } else {
            if (this.isSpoutFullSleep) {
                JStormUtils.sleepMs(1L);
            }
            this.emptyCpuGauge.start();
        }
    }

    @Override
    public void run() {
        throw new RuntimeException("Should implement this function");
    }

    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        block14: {
            try {
                if (event == null) {
                    return;
                }
                Runnable runnable = null;
                if (event instanceof Tuple) {
                    this.processControlEvent();
                    runnable = this.processTupleEvent((Tuple)event);
                } else if (event instanceof BatchTuple) {
                    for (Tuple tuple : ((BatchTuple)event).getTuples()) {
                        this.processControlEvent();
                        runnable = this.processTupleEvent(tuple);
                        if (runnable == null) continue;
                        runnable.run();
                        runnable = null;
                    }
                } else {
                    if (event instanceof TimerTrigger.TimerEvent) {
                        this.processTimerEvent((TimerTrigger.TimerEvent)event);
                        return;
                    }
                    if (event instanceof IAckMsg) {
                        runnable = (Runnable)event;
                    } else if (event instanceof Runnable) {
                        runnable = (Runnable)event;
                    } else {
                        LOG.warn("Receive one unknow event-" + event.toString() + " " + this.idStr);
                        return;
                    }
                }
                if (runnable != null) {
                    runnable.run();
                }
            }
            catch (Throwable e) {
                if (this.taskStatus.isShutdown()) break block14;
                LOG.info("Unknow excpetion ", e);
                this.report_error.report(e);
            }
        }
    }

    private Runnable processTupleEvent(Tuple event) {
        IAckMsg runnable = null;
        Tuple tuple = event;
        if (event.getSourceStreamId().equals("__master_control_stream")) {
            TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent)tuple.getValueByField("ctrlEvent");
            this.taskTransfer.getBackpressureController().control(ctrlEvent);
        } else {
            Object id = tuple.getValue(0);
            Object obj = this.pending.remove((Long)id);
            if (obj == null) {
                if (JStormDebugger.isDebug(id)) {
                    LOG.info("Pending map no entry:" + id);
                }
                runnable = null;
            } else {
                TupleInfo tupleInfo = (TupleInfo)obj;
                String stream_id = tuple.getSourceStreamId();
                if (stream_id.equals("__ack_ack")) {
                    runnable = new AckSpoutMsg(id, this.spout, tuple, tupleInfo, this.task_stats);
                } else if (stream_id.equals("__ack_fail")) {
                    runnable = new FailSpoutMsg(id, this.spout, tupleInfo, this.task_stats);
                } else {
                    LOG.warn("Receive one unknow source Tuple " + this.idStr);
                    runnable = null;
                }
            }
            this.task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
        }
        return runnable;
    }

    public AsyncLoopThread getAckerRunnableThread() {
        return this.ackerRunnableThread;
    }

    private void processTimerEvent(TimerTrigger.TimerEvent event) {
        switch (event.getOpCode()) {
            case 1: {
                Map<Long, TupleInfo> timeoutMap = this.pending.rotate();
                for (Map.Entry<Long, TupleInfo> entry : timeoutMap.entrySet()) {
                    TupleInfo tupleInfo = entry.getValue();
                    FailSpoutMsg fail = new FailSpoutMsg(entry.getKey(), this.spout, tupleInfo, this.task_stats);
                    fail.run();
                }
                break;
            }
            case 3: {
                this.taskHbTrigger.setExeThreadHbTime(TimeUtils.current_time_secs());
                break;
            }
            default: {
                LOG.warn("Receive unsupported timer event, opcode=" + event.getOpCode());
            }
        }
    }

    protected void processControlEvent() {
        Object event = this.controlQueue.poll();
        if (event != null) {
            if (event instanceof TimerTrigger.TimerEvent) {
                this.processTimerEvent((TimerTrigger.TimerEvent)event);
            } else if (event instanceof Tuple) {
                this.processTupleEvent((Tuple)event);
            } else if (event instanceof BatchTuple) {
                for (Tuple tuple : ((BatchTuple)event).getTuples()) {
                    this.processTupleEvent(tuple);
                }
            } else {
                LOG.warn("Received unknown control event, " + event.getClass().getName());
            }
        }
    }

    @Override
    public Object getOutputCollector() {
        return this.outputCollector;
    }
}

