/*
 * Decompiled with CFR 0.152.
 */
package backtype.storm.spout;

import backtype.storm.generated.ShellComponent;
import backtype.storm.metric.api.IMetric;
import backtype.storm.metric.api.rpc.IShellMetric;
import backtype.storm.multilang.ShellMsg;
import backtype.storm.multilang.SpoutMsg;
import backtype.storm.spout.ISpout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
import clojure.lang.RT;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShellSpout
implements ISpout {
    public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class);
    private SpoutOutputCollector _collector;
    private String[] _command;
    private ShellProcess _process;
    private TopologyContext _context;
    private SpoutMsg _spoutMsg;
    private int workerTimeoutMills;
    private ScheduledExecutorService heartBeatExecutorService;
    private AtomicLong lastHeartbeatTimestamp = new AtomicLong();

    public ShellSpout(ShellComponent component) {
        this(component.get_execution_command(), component.get_script());
    }

    public ShellSpout(String ... command) {
        this._command = command;
    }

    @Override
    public void open(Map stormConf, TopologyContext context, SpoutOutputCollector collector) {
        this._collector = collector;
        this._context = context;
        this.workerTimeoutMills = 1000 * RT.intCast(stormConf.get("supervisor.worker.timeout.secs"));
        this._process = new ShellProcess(this._command);
        Number subpid = this._process.launch(stormConf, context);
        LOG.info("Launched subprocess with pid " + subpid);
        this.heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService((ScheduledThreadPoolExecutor)new ScheduledThreadPoolExecutor(1));
    }

    @Override
    public void close() {
        this.heartBeatExecutorService.shutdownNow();
        this._process.destroy();
    }

    @Override
    public void nextTuple() {
        if (this._spoutMsg == null) {
            this._spoutMsg = new SpoutMsg();
        }
        this._spoutMsg.setCommand("next");
        this._spoutMsg.setId("");
        this.querySubprocess();
    }

    @Override
    public void ack(Object msgId) {
        if (this._spoutMsg == null) {
            this._spoutMsg = new SpoutMsg();
        }
        this._spoutMsg.setCommand("ack");
        this._spoutMsg.setId(msgId);
        this.querySubprocess();
    }

    @Override
    public void fail(Object msgId) {
        if (this._spoutMsg == null) {
            this._spoutMsg = new SpoutMsg();
        }
        this._spoutMsg.setCommand("fail");
        this._spoutMsg.setId(msgId);
        this.querySubprocess();
    }

    private void handleMetrics(ShellMsg shellMsg) {
        String name = shellMsg.getMetricName();
        if (name.isEmpty()) {
            throw new RuntimeException("Receive Metrics name is empty");
        }
        IMetric iMetric = this._context.getRegisteredMetricByName(name);
        if (iMetric == null) {
            throw new RuntimeException("Could not find metric by name[" + name + "] ");
        }
        if (!(iMetric instanceof IShellMetric)) {
            throw new RuntimeException("Metric[" + name + "] is not IShellMetric, can not call by RPC");
        }
        IShellMetric iShellMetric = (IShellMetric)iMetric;
        Object paramsObj = shellMsg.getMetricParams();
        try {
            iShellMetric.updateMetricFromRPC(paramsObj);
        }
        catch (RuntimeException re) {
            throw re;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void querySubprocess() {
        try {
            String command;
            this._process.writeSpoutMsg(this._spoutMsg);
            while (true) {
                ShellMsg shellMsg;
                if ((command = (shellMsg = this._process.readShellMsg()).getCommand()) == null) {
                    throw new IllegalArgumentException("Command not found in spout message: " + shellMsg);
                }
                this.setHeartbeat();
                if (command.equals("sync")) {
                    return;
                }
                if (command.equals("log")) {
                    this.handleLog(shellMsg);
                    continue;
                }
                if (command.equals("error")) {
                    this.handleError(shellMsg.getMsg());
                    continue;
                }
                if (command.equals("emit")) {
                    String stream = shellMsg.getStream();
                    Long task = shellMsg.getTask();
                    List<Object> tuple = shellMsg.getTuple();
                    Object messageId = shellMsg.getId();
                    if (task == 0L) {
                        List<Integer> outtasks = this._collector.emit(stream, tuple, messageId);
                        if (!shellMsg.areTaskIdsNeeded()) continue;
                        this._process.writeTaskIds(outtasks);
                        continue;
                    }
                    this._collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
                    continue;
                }
                if (!command.equals("metrics")) break;
                this.handleMetrics(shellMsg);
            }
            throw new RuntimeException("Unknown command received: " + command);
        }
        catch (Exception e) {
            String processInfo = this._process.getProcessInfoString() + this._process.getProcessTerminationInfoString();
            throw new RuntimeException(processInfo, e);
        }
    }

    private void handleLog(ShellMsg shellMsg) {
        String msg = shellMsg.getMsg();
        msg = "ShellLog " + this._process.getProcessInfoString() + " " + msg;
        ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel();
        switch (logLevel) {
            case TRACE: {
                LOG.trace(msg);
                break;
            }
            case DEBUG: {
                LOG.debug(msg);
                break;
            }
            case INFO: {
                LOG.info(msg);
                break;
            }
            case WARN: {
                LOG.warn(msg);
                break;
            }
            case ERROR: {
                LOG.error(msg);
                break;
            }
            default: {
                LOG.info(msg);
            }
        }
    }

    private void handleError(String msg) {
        this._collector.reportError(new Exception("Shell Process Exception: " + msg));
    }

    @Override
    public void activate() {
        LOG.info("Start checking heartbeat...");
        this.setHeartbeat();
        this.heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1L, 1L, TimeUnit.SECONDS);
    }

    @Override
    public void deactivate() {
        this.heartBeatExecutorService.shutdownNow();
    }

    private void setHeartbeat() {
        this.lastHeartbeatTimestamp.set(System.currentTimeMillis());
    }

    private long getLastHeartbeat() {
        return this.lastHeartbeatTimestamp.get();
    }

    private void die(Throwable exception) {
        this.heartBeatExecutorService.shutdownNow();
        LOG.error("Halting process: ShellSpout died.", exception);
        this._collector.reportError(exception);
        this._process.destroy();
        System.exit(11);
    }

    private class SpoutHeartbeatTimerTask
    extends TimerTask {
        private ShellSpout spout;

        public SpoutHeartbeatTimerTask(ShellSpout spout) {
            this.spout = spout;
        }

        @Override
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            long lastHeartbeat = ShellSpout.this.getLastHeartbeat();
            LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}", new Object[]{currentTimeMillis, lastHeartbeat, ShellSpout.this.workerTimeoutMills});
            if (currentTimeMillis - lastHeartbeat > (long)ShellSpout.this.workerTimeoutMills) {
                this.spout.die(new RuntimeException("subprocess heartbeat timeout"));
            }
        }
    }
}

