/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.execute;

import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleExt;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.daemon.worker.timer.BackpressureCheckTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TickTupleTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.backpressure.BackpressureTrigger;
import com.alibaba.jstorm.task.execute.BaseExecutors;
import com.alibaba.jstorm.task.execute.BoltBatchCollector;
import com.alibaba.jstorm.task.execute.BoltCollector;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import com.alibaba.jstorm.utils.TimeUtils;
import com.lmax.disruptor.EventHandler;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BoltExecutors
extends BaseExecutors
implements EventHandler {
    private static Logger LOG = LoggerFactory.getLogger(BoltExecutors.class);
    protected IBolt bolt;
    protected RotatingMap<Tuple, Long> tuple_start_times;
    private int ackerNum = 0;
    private OutputCollector outputCollector;
    private volatile double exeTime;
    private BackpressureTrigger backpressureTrigger;
    private boolean isSystemBolt;

    public BoltExecutors(Task task) {
        super(task);
        this.bolt = (IBolt)task.getTaskObj();
        this.tuple_start_times = new RotatingMap(3);
        this.ackerNum = JStormUtils.parseInt(this.storm_conf.get("topology.acker.executors"));
        BoltCollector output_collector = null;
        output_collector = ConfigExtension.isTaskBatchTuple(this.storm_conf) != false ? new BoltBatchCollector(task, this.tuple_start_times, this.message_timeout_secs) : new BoltCollector(task, this.tuple_start_times, this.message_timeout_secs);
        this.outputCollector = new OutputCollector(output_collector);
        this.taskHbTrigger.setBoltOutputCollector(output_collector);
        Object tickFrequence = this.storm_conf.get("topology.tick.tuple.freq.secs");
        this.isSystemBolt = Common.isSystemComponent(this.componentId);
        if (tickFrequence != null && !this.isSystemBolt) {
            Integer frequence = JStormUtils.parseInt(tickFrequence);
            TickTupleTrigger tickTupleTrigger = new TickTupleTrigger(this.sysTopologyCtx, frequence, this.idStr + "__tick", this.controlQueue);
            tickTupleTrigger.register();
        }
        if (!this.isSystemBolt) {
            this.backpressureTrigger = new BackpressureTrigger(task, this, this.storm_conf, output_collector);
            int backpressureCheckFrequence = ConfigExtension.getBackpressureCheckIntervl(this.storm_conf);
            BackpressureCheckTrigger backpressureCheckTrigger = new BackpressureCheckTrigger(30, backpressureCheckFrequence, this.idStr + " backpressure check trigger", this.backpressureTrigger);
            backpressureCheckTrigger.register(TimeUnit.MILLISECONDS);
        }
        LOG.info("Successfully create BoltExecutors " + this.idStr);
    }

    @Override
    public void init() {
        this.bolt.prepare(this.storm_conf, this.userTopologyCtx, this.outputCollector);
        LOG.info("Succeesfully do Bolt.prepare");
        this.taskHbTrigger.register();
    }

    @Override
    public String getThreadName() {
        return this.idStr + "-" + BoltExecutors.class.getSimpleName();
    }

    @Override
    public void run() {
        if (!this.isFinishInit) {
            this.initWrapper();
        }
        while (!this.taskStatus.isShutdown()) {
            try {
                this.exeQueue.consumeBatchWhenAvailable(this);
                this.processControlEvent();
            }
            catch (Throwable e) {
                if (this.taskStatus.isShutdown()) continue;
                LOG.error(this.idStr + " bolt exeutor  error", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        if (event == null) {
            return;
        }
        long start = System.currentTimeMillis();
        try {
            if (event instanceof Tuple) {
                this.processControlEvent();
                this.processTupleEvent((Tuple)event);
            } else if (event instanceof BatchTuple) {
                for (Tuple tuple : ((BatchTuple)event).getTuples()) {
                    this.processControlEvent();
                    this.processTupleEvent(tuple);
                }
            } else if (event instanceof TimerTrigger.TimerEvent) {
                this.processTimerEvent((TimerTrigger.TimerEvent)event);
            } else {
                LOG.warn("Bolt executor received unknown message");
            }
        }
        finally {
            if (JStormMetrics.enabled) {
                this.exeTime = System.currentTimeMillis() - start;
            }
        }
    }

    private void processTupleEvent(Tuple tuple) {
        Long latencyStart;
        this.task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
        this.tuple_start_times.put(tuple, System.currentTimeMillis());
        try {
            if (!this.isSystemBolt && tuple.getSourceStreamId().equals("__master_control_stream")) {
                this.backpressureTrigger.handle(tuple);
            } else {
                this.bolt.execute(tuple);
            }
        }
        catch (Throwable e) {
            this.error = e;
            LOG.error("bolt execute error ", e);
            this.report_error.report(e);
        }
        if (this.ackerNum == 0 && (latencyStart = (Long)this.tuple_start_times.remove(tuple)) != null && JStormMetrics.enabled) {
            long endTime = System.currentTimeMillis();
            long lifeCycleStart = ((TupleExt)tuple).getCreationTimeStamp();
            this.task_stats.bolt_acked_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), latencyStart, lifeCycleStart, endTime);
        }
    }

    private void processTimerEvent(TimerTrigger.TimerEvent event) {
        switch (event.getOpCode()) {
            case 1: {
                Map<Tuple, Long> timeoutMap = this.tuple_start_times.rotate();
                if (this.ackerNum <= 0) break;
                for (Map.Entry<Tuple, Long> entry : timeoutMap.entrySet()) {
                    Tuple input = entry.getKey();
                    this.task_stats.bolt_failed_tuple(input.getSourceComponent(), input.getSourceStreamId());
                }
                break;
            }
            case 2: {
                try {
                    Tuple tuple = (Tuple)event.getMsg();
                    this.bolt.execute(tuple);
                }
                catch (Throwable e) {
                    this.error = e;
                    LOG.error("bolt execute error ", e);
                    this.report_error.report(e);
                }
                break;
            }
            case 3: {
                this.taskHbTrigger.setExeThreadHbTime(TimeUtils.current_time_secs());
                break;
            }
            default: {
                LOG.warn("Receive unsupported timer event, opcode=" + event.getOpCode());
            }
        }
    }

    protected void processControlEvent() {
        Object event = this.controlQueue.poll();
        if (event != null) {
            if (event instanceof TimerTrigger.TimerEvent) {
                this.processTimerEvent((TimerTrigger.TimerEvent)event);
                LOG.debug("Received one event from control queue");
            } else if (event instanceof Tuple) {
                this.processTupleEvent((Tuple)event);
            } else if (event instanceof BatchTuple) {
                for (Tuple tuple : ((BatchTuple)event).getTuples()) {
                    this.processTupleEvent(tuple);
                }
            } else {
                LOG.warn("Received unknown control event, " + event.getClass().getName());
            }
        }
    }

    public double getExecuteTime() {
        return this.exeTime;
    }

    @Override
    public Object getOutputCollector() {
        return this.outputCollector;
    }
}

