/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.task;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.generated.ShellComponent;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.rpc.IShellMetric;
import org.apache.storm.multilang.BoltMsg;
import org.apache.storm.multilang.ShellMsg;
import org.apache.storm.shade.com.google.common.util.concurrent.MoreExecutors;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ShellBoltMessageQueue;
import org.apache.storm.utils.ShellLogHandler;
import org.apache.storm.utils.ShellProcess;
import org.apache.storm.utils.ShellUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShellBolt
implements IBolt {
    public static final String HEARTBEAT_STREAM_ID = "__heartbeat";
    public static final Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
    private static final long serialVersionUID = -339575186639193348L;
    OutputCollector _collector;
    Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
    private String[] _command;
    private Map<String, String> env = new HashMap<String, String>();
    private ShellLogHandler _logHandler;
    private ShellProcess _process;
    private volatile boolean _running = true;
    private volatile Throwable _exception;
    private ShellBoltMessageQueue _pendingWrites = new ShellBoltMessageQueue();
    private Random _rand;
    private Thread _readerThread;
    private Thread _writerThread;
    private TopologyContext _context;
    private int workerTimeoutMills;
    private ScheduledExecutorService heartBeatExecutorService;
    private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
    private AtomicBoolean sendHeartbeatFlag = new AtomicBoolean(false);
    private boolean _isLocalMode = false;
    private boolean changeDirectory = true;

    public ShellBolt(ShellComponent component) {
        this(component.get_execution_command(), component.get_script());
    }

    public ShellBolt(String ... command) {
        this._command = command;
    }

    public ShellBolt setEnv(Map<String, String> env) {
        this.env = env;
        return this;
    }

    public boolean shouldChangeChildCWD() {
        return this.changeDirectory;
    }

    public void changeChildCWD(boolean changeDirectory) {
        this.changeDirectory = changeDirectory;
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        Object maxPending;
        if (ConfigUtils.isLocalMode(topoConf)) {
            this._isLocalMode = true;
        }
        if ((maxPending = topoConf.get("topology.shellbolt.max.pending")) != null) {
            this._pendingWrites = new ShellBoltMessageQueue(((Number)maxPending).intValue());
        }
        this._rand = new Random();
        this._collector = collector;
        this._context = context;
        this.workerTimeoutMills = topoConf.containsKey("topology.subprocess.timeout.secs") ? 1000 * ObjectReader.getInt(topoConf.get("topology.subprocess.timeout.secs")) : 1000 * ObjectReader.getInt(topoConf.get("supervisor.worker.timeout.secs"));
        this._process = new ShellProcess(this._command);
        if (!this.env.isEmpty()) {
            this._process.setEnv(this.env);
        }
        Number subpid = this._process.launch(topoConf, context, this.changeDirectory);
        LOG.info("Launched subprocess with pid " + subpid);
        this._logHandler = ShellUtils.getLogHandler(topoConf);
        this._logHandler.setUpContext(ShellBolt.class, this._process, this._context);
        this._readerThread = new Thread(new BoltReaderRunnable());
        this._readerThread.start();
        this._writerThread = new Thread(new BoltWriterRunnable());
        this._writerThread.start();
        LOG.info("Start checking heartbeat...");
        this.setHeartbeat();
        this.heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService((ScheduledThreadPoolExecutor)new ScheduledThreadPoolExecutor(1));
        this.heartBeatExecutorService.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1L, 1L, TimeUnit.SECONDS);
    }

    @Override
    public void execute(Tuple input) {
        if (this._exception != null) {
            throw new RuntimeException(this._exception);
        }
        String genId = Long.toString(this._rand.nextLong());
        this._inputs.put(genId, input);
        try {
            BoltMsg boltMsg = this.createBoltMessage(input, genId);
            this._pendingWrites.putBoltMsg(boltMsg);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private BoltMsg createBoltMessage(Tuple input, String genId) {
        BoltMsg boltMsg = new BoltMsg();
        boltMsg.setId(genId);
        boltMsg.setComp(input.getSourceComponent());
        boltMsg.setStream(input.getSourceStreamId());
        boltMsg.setTask(input.getSourceTask());
        boltMsg.setTuple(input.getValues());
        return boltMsg;
    }

    @Override
    public void cleanup() {
        this._running = false;
        this.heartBeatExecutorService.shutdownNow();
        this._writerThread.interrupt();
        this._readerThread.interrupt();
        this._process.destroy();
        this._inputs.clear();
    }

    private void handleAck(Object id) {
        Tuple acked = this._inputs.remove(id);
        if (acked == null) {
            throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
        }
        this._collector.ack(acked);
    }

    private void handleFail(Object id) {
        Tuple failed = this._inputs.remove(id);
        if (failed == null) {
            throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);
        }
        this._collector.fail(failed);
    }

    private void handleError(String msg) {
        this._collector.reportError(new Exception("Shell Process Exception: " + msg));
    }

    private void handleEmit(ShellMsg shellMsg) throws InterruptedException {
        ArrayList<Tuple> anchors = new ArrayList<Tuple>();
        List<String> recvAnchors = shellMsg.getAnchors();
        if (recvAnchors != null) {
            for (String anchor : recvAnchors) {
                Tuple t = this._inputs.get(anchor);
                if (t == null) {
                    throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
                }
                anchors.add(t);
            }
        }
        if (shellMsg.getTask() == 0L) {
            List<Integer> outtasks = this._collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
            if (shellMsg.areTaskIdsNeeded()) {
                this._pendingWrites.putTaskIds(outtasks);
            }
        } else {
            this._collector.emitDirect((int)shellMsg.getTask(), shellMsg.getStream(), anchors, shellMsg.getTuple());
        }
    }

    private void handleMetrics(ShellMsg shellMsg) {
        String name = shellMsg.getMetricName();
        if (name.isEmpty()) {
            throw new RuntimeException("Receive Metrics name is empty");
        }
        IMetric iMetric = this._context.getRegisteredMetricByName(name);
        if (iMetric == null) {
            throw new RuntimeException("Could not find metric by name[" + name + "] ");
        }
        if (!(iMetric instanceof IShellMetric)) {
            throw new RuntimeException("Metric[" + name + "] is not IShellMetric, can not call by RPC");
        }
        IShellMetric iShellMetric = (IShellMetric)iMetric;
        Object paramsObj = shellMsg.getMetricParams();
        try {
            iShellMetric.updateMetricFromRPC(paramsObj);
        }
        catch (RuntimeException re) {
            throw re;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void setHeartbeat() {
        this.lastHeartbeatTimestamp.set(System.currentTimeMillis());
    }

    private long getLastHeartbeat() {
        return this.lastHeartbeatTimestamp.get();
    }

    private void die(Throwable exception) {
        String processInfo = this._process.getProcessInfoString() + this._process.getProcessTerminationInfoString();
        this._exception = new RuntimeException(processInfo, exception);
        String message = String.format("Halting process: ShellBolt died. Command: %s, ProcessInfo %s", Arrays.toString(this._command), processInfo);
        LOG.error(message, exception);
        this._collector.reportError(exception);
        if (!this._isLocalMode && (this._running || exception instanceof Error)) {
            System.exit(11);
        }
    }

    private class BoltWriterRunnable
    implements Runnable {
        private BoltWriterRunnable() {
        }

        @Override
        public void run() {
            while (ShellBolt.this._running) {
                try {
                    Object write;
                    if (ShellBolt.this.sendHeartbeatFlag.get()) {
                        LOG.debug("BOLT - sending heartbeat request to subprocess");
                        String genId = Long.toString(ShellBolt.this._rand.nextLong());
                        ShellBolt.this._process.writeBoltMsg(this.createHeartbeatBoltMessage(genId));
                        ShellBolt.this.sendHeartbeatFlag.compareAndSet(true, false);
                    }
                    if ((write = ShellBolt.this._pendingWrites.poll(1L, TimeUnit.SECONDS)) instanceof BoltMsg) {
                        ShellBolt.this._process.writeBoltMsg((BoltMsg)write);
                        continue;
                    }
                    if (write instanceof List) {
                        ShellBolt.this._process.writeTaskIds((List)write);
                        continue;
                    }
                    if (write == null) continue;
                    throw new RuntimeException("Unknown class type to write: " + write.getClass().getName());
                }
                catch (InterruptedException write) {
                }
                catch (Throwable t) {
                    ShellBolt.this.die(t);
                }
            }
        }

        private BoltMsg createHeartbeatBoltMessage(String genId) {
            BoltMsg msg = new BoltMsg();
            msg.setId(genId);
            msg.setTask(-1L);
            msg.setStream(ShellBolt.HEARTBEAT_STREAM_ID);
            msg.setTuple(new ArrayList<Object>());
            return msg;
        }
    }

    private class BoltReaderRunnable
    implements Runnable {
        private BoltReaderRunnable() {
        }

        @Override
        public void run() {
            while (ShellBolt.this._running) {
                try {
                    ShellMsg shellMsg = ShellBolt.this._process.readShellMsg();
                    String command = shellMsg.getCommand();
                    if (command == null) {
                        throw new IllegalArgumentException("Command not found in bolt message: " + shellMsg);
                    }
                    ShellBolt.this.setHeartbeat();
                    switch (command) {
                        case "ack": {
                            ShellBolt.this.handleAck(shellMsg.getId());
                            break;
                        }
                        case "fail": {
                            ShellBolt.this.handleFail(shellMsg.getId());
                            break;
                        }
                        case "error": {
                            ShellBolt.this.handleError(shellMsg.getMsg());
                            break;
                        }
                        case "log": {
                            ShellBolt.this._logHandler.log(shellMsg);
                            break;
                        }
                        case "emit": {
                            ShellBolt.this.handleEmit(shellMsg);
                            break;
                        }
                        case "metrics": {
                            ShellBolt.this.handleMetrics(shellMsg);
                        }
                    }
                }
                catch (InterruptedException shellMsg) {
                }
                catch (Throwable t) {
                    ShellBolt.this.die(t);
                }
            }
        }
    }

    private class BoltHeartbeatTimerTask
    extends TimerTask {
        private ShellBolt bolt;

        public BoltHeartbeatTimerTask(ShellBolt bolt) {
            this.bolt = bolt;
        }

        @Override
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            long lastHeartbeat = ShellBolt.this.getLastHeartbeat();
            LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}", new Object[]{currentTimeMillis, lastHeartbeat, ShellBolt.this.workerTimeoutMills});
            if (currentTimeMillis - lastHeartbeat > (long)ShellBolt.this.workerTimeoutMills) {
                this.bolt.die(new RuntimeException("subprocess heartbeat timeout"));
            }
            ShellBolt.this.sendHeartbeatFlag.compareAndSet(false, true);
        }
    }
}

