/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.worker;

import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.ContextMaker;
import com.alibaba.jstorm.daemon.worker.DrainerBatchCtrlRunable;
import com.alibaba.jstorm.daemon.worker.DrainerCtrlRunable;
import com.alibaba.jstorm.daemon.worker.RefreshActive;
import com.alibaba.jstorm.daemon.worker.RefreshConnections;
import com.alibaba.jstorm.daemon.worker.VirtualPortBatchCtrlDispatch;
import com.alibaba.jstorm.daemon.worker.VirtualPortCtrlDispatch;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.daemon.worker.WorkerShutdown;
import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb;
import com.alibaba.jstorm.daemon.worker.hearbeat.WorkerHeartbeatRunable;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskShutdownDameon;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker {
    private static Logger LOG = LoggerFactory.getLogger(Worker.class);
    private WorkerData workerData;

    public Worker(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path) throws Exception {
        this.workerData = new WorkerData(conf, context, topology_id, supervisor_id, port, worker_id, jar_path);
    }

    public static Set<Integer> worker_output_tasks(WorkerData workerData) {
        ContextMaker context_maker = workerData.getContextMaker();
        Set<Integer> task_ids = workerData.getTaskids();
        StormTopology topology = workerData.getSysTopology();
        HashSet<Integer> rtn = new HashSet<Integer>();
        for (Integer taskid : task_ids) {
            TopologyContext context = context_maker.makeTopologyContext(topology, taskid, null);
            Map<String, Map<String, Grouping>> targets = context.getThisTargets();
            for (Map<String, Grouping> e : targets.values()) {
                for (String componentId : e.keySet()) {
                    List<Integer> tasks = context.getComponentTasks(componentId);
                    rtn.addAll(tasks);
                }
            }
        }
        return rtn;
    }

    private RefreshConnections makeRefreshConnections() {
        Set<Integer> outboundTasks = Worker.worker_output_tasks(this.workerData);
        this.workerData.initOutboundTaskStatus(outboundTasks);
        this.workerData.setOutboundTasks(outboundTasks);
        RefreshConnections refresh_connections = new RefreshConnections(this.workerData);
        return refresh_connections;
    }

    private List<TaskShutdownDameon> createTasks() throws Exception {
        ArrayList<TaskShutdownDameon> shutdowntasks = new ArrayList<TaskShutdownDameon>();
        Set<Integer> taskids = this.workerData.getTaskids();
        HashSet<Thread> threads = new HashSet<Thread>();
        ArrayList<Task> taskArrayList = new ArrayList<Task>();
        for (int taskid : taskids) {
            Task task = new Task(this.workerData, taskid);
            Thread thread = new Thread(task);
            threads.add(thread);
            taskArrayList.add(task);
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        for (Task t : taskArrayList) {
            shutdowntasks.add(t.getTaskShutdownDameon());
        }
        return shutdowntasks;
    }

    private AsyncLoopThread startDispatchThread() {
        IContext context = this.workerData.getContext();
        String topologyId = this.workerData.getTopologyId();
        Map<Object, Object> stormConf = this.workerData.getStormConf();
        WaitStrategy waitStrategy = (WaitStrategy)JStormUtils.createDisruptorWaitStrategy(stormConf);
        int queueSize = JStormUtils.parseInt(stormConf.get("topology.ctrl.buffer.size"), 256);
        DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI, queueSize, waitStrategy);
        QueueGauge revCtrlGauge = new QueueGauge(recvControlQueue, "RecvCtrlQueue");
        JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName("RecvCtrlQueue", MetricType.GAUGE), new AsmGauge(revCtrlGauge));
        IConnection recvConnection = context.bind(topologyId, this.workerData.getPort(), this.workerData.getDeserializeQueues(), recvControlQueue);
        this.workerData.setRecvConnection(recvConnection);
        VirtualPortCtrlDispatch recvControlDispather = null;
        boolean isTaskBatchTuple = ConfigExtension.isTaskBatchTuple(stormConf);
        recvControlDispather = isTaskBatchTuple ? new VirtualPortBatchCtrlDispatch(this.workerData, recvConnection, recvControlQueue, "BatchRecvCtrl") : new VirtualPortCtrlDispatch(this.workerData, recvConnection, recvControlQueue, "RecvCtrl");
        AsyncLoopThread recvControlThread = new AsyncLoopThread(recvControlDispather, false, 10, true);
        return recvControlThread;
    }

    public WorkerShutdown execute() throws Exception {
        ArrayList<AsyncLoopThread> threads = new ArrayList<AsyncLoopThread>();
        AsyncLoopThread controlRvthread = this.startDispatchThread();
        threads.add(controlRvthread);
        RefreshConnections refreshConn = this.makeRefreshConnections();
        AsyncLoopThread refreshconn = new AsyncLoopThread(refreshConn, false, 1, true);
        threads.add(refreshconn);
        RefreshActive refreshZkActive = new RefreshActive(this.workerData);
        AsyncLoopThread refreshzk = new AsyncLoopThread(refreshZkActive, false, 1, true);
        threads.add(refreshzk);
        boolean isTaskBatchTuple = ConfigExtension.isTaskBatchTuple(this.workerData.getStormConf());
        DrainerCtrlRunable drainerCtrlRunable = isTaskBatchTuple ? new DrainerBatchCtrlRunable(this.workerData, "BatchSendCtrl") : new DrainerCtrlRunable(this.workerData, "SendCtrl");
        AsyncLoopThread controlSendThread = new AsyncLoopThread(drainerCtrlRunable, false, 10, true);
        threads.add(controlSendThread);
        AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkWorkerInstance(this.workerData.getStormConf());
        if (syncContainerHbThread != null) {
            threads.add(syncContainerHbThread);
        }
        JStormMetricsReporter metricReporter = new JStormMetricsReporter(this.workerData);
        metricReporter.init();
        this.workerData.setMetricsReporter(metricReporter);
        WorkerHeartbeatRunable heartbeat_fn = new WorkerHeartbeatRunable(this.workerData);
        AsyncLoopThread hb = new AsyncLoopThread(heartbeat_fn, false, null, 5, true);
        threads.add(hb);
        List<TaskShutdownDameon> shutdowntasks = this.createTasks();
        this.workerData.setShutdownTasks(shutdowntasks);
        return new WorkerShutdown(this.workerData, threads);
    }

    public static WorkerShutdown mk_worker(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path) throws Exception {
        StringBuilder sb = new StringBuilder();
        sb.append("topologyId:" + topology_id + ", ");
        sb.append("port:" + port + ", ");
        sb.append("workerId:" + worker_id + ", ");
        sb.append("jarPath:" + jar_path + "\n");
        LOG.info("Begin to run worker:" + sb.toString());
        Worker w = new Worker(conf, context, topology_id, supervisor_id, port, worker_id, jar_path);
        return w.execute();
    }

    public void redirectOutput() {
        if (System.getenv("REDIRECT") == null || !System.getenv("REDIRECT").equals("true")) {
            return;
        }
        String DEFAULT_OUT_TARGET_FILE = JStormUtils.getLogFileName();
        DEFAULT_OUT_TARGET_FILE = DEFAULT_OUT_TARGET_FILE == null ? "/dev/null" : DEFAULT_OUT_TARGET_FILE + ".out";
        String outputFile = ConfigExtension.getWorkerRedirectOutputFile(this.workerData.getStormConf());
        if (outputFile == null) {
            outputFile = DEFAULT_OUT_TARGET_FILE;
        } else {
            try {
                File file = new File(outputFile);
                if (!file.exists()) {
                    PathUtils.touch(outputFile);
                } else if (file.isDirectory()) {
                    LOG.warn("Failed to write " + outputFile);
                    outputFile = DEFAULT_OUT_TARGET_FILE;
                } else if (!file.canWrite()) {
                    LOG.warn("Failed to write " + outputFile);
                    outputFile = DEFAULT_OUT_TARGET_FILE;
                }
            }
            catch (Exception e) {
                LOG.warn("Failed to touch " + outputFile, (Throwable)e);
                outputFile = DEFAULT_OUT_TARGET_FILE;
            }
        }
        try {
            JStormUtils.redirectOutput(outputFile);
        }
        catch (Exception e) {
            LOG.warn("Failed to redirect to " + outputFile, (Throwable)e);
        }
    }

    public static List<Integer> getOldPortPids(String port) {
        String currPid = JStormUtils.process_pid();
        ArrayList<Integer> ret = new ArrayList<Integer>();
        StringBuilder sb = new StringBuilder();
        sb.append("ps -Af ");
        try {
            String str;
            LOG.info("Begin to execute " + sb.toString());
            String output = JStormUtils.launchProcess(sb.toString(), new HashMap<String, String>(), false);
            BufferedReader reader = new BufferedReader(new StringReader(output));
            JStormUtils.sleepMs(1000L);
            while ((str = reader.readLine()) != null) {
                if (StringUtils.isBlank((String)str) || !str.contains(Worker.class.getName()) || !str.contains(port)) continue;
                LOG.info("Find :" + str);
                String[] fields = StringUtils.split((String)str);
                boolean find = false;
                for (int i = 0; i < fields.length; ++i) {
                    String field = fields[i];
                    LOG.debug("Filed, " + i + ":" + field);
                    if (!field.contains(Worker.class.getName())) continue;
                    if (i + 3 >= fields.length) {
                        LOG.info("Failed to find port ");
                        break;
                    }
                    if (!fields[i + 3].equals(String.valueOf(port))) break;
                    find = true;
                    break;
                }
                if (!find) {
                    LOG.info("No old port worker");
                    continue;
                }
                if (fields.length < 2) continue;
                try {
                    if (currPid.equals(fields[1])) {
                        LOG.info("Skip kill myself");
                        continue;
                    }
                    Integer pid = Integer.valueOf(fields[1]);
                    LOG.info("Find one process :" + pid.toString());
                    ret.add(pid);
                }
                catch (Exception e) {
                    LOG.error(e.getMessage(), (Throwable)e);
                }
            }
            return ret;
        }
        catch (IOException e) {
            LOG.info("Failed to execute " + sb.toString());
            return ret;
        }
        catch (Exception e) {
            LOG.info(e.getMessage(), (Throwable)e);
            return ret;
        }
    }

    public static void killOldWorker(String port) {
        List<Integer> oldPids = Worker.getOldPortPids(port);
        for (Integer pid : oldPids) {
            JStormUtils.kill(pid);
        }
    }

    public static void main(String[] args) {
        StringBuilder sb = new StringBuilder();
        for (String arg : args) {
            sb.append(arg + " ");
        }
        LOG.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        LOG.info("Begin to start worker:" + sb.toString());
        LOG.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        if (args.length < 5) {
            LOG.error("The length of args is less than 5 ");
            System.exit(-1);
        }
        sb = new StringBuilder();
        try {
            String topology_id = args[0];
            String supervisor_id = args[1];
            String port_str = args[2];
            String worker_id = args[3];
            String jar_path = args[4];
            Worker.killOldWorker(port_str);
            Map conf = Utils.readStormConfig();
            StormConfig.validate_distributed_mode(conf);
            JStormServerUtils.startTaobaoJvmMonitor();
            sb.append("topologyId:" + topology_id + ", ");
            sb.append("port:" + port_str + ", ");
            sb.append("workerId:" + worker_id + ", ");
            sb.append("jar_path:" + jar_path + "\n");
            WorkerShutdown sd = Worker.mk_worker(conf, null, topology_id, supervisor_id, Integer.parseInt(port_str), worker_id, jar_path);
            sd.join();
            LOG.info("@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
            LOG.info("Successfully shutdown worker " + sb.toString());
            LOG.info("@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
        }
        catch (Throwable e) {
            String errMsg = "Failed to create worker, " + sb.toString();
            LOG.error(errMsg, e);
            JStormUtils.halt_process(-1, errMsg);
        }
    }
}

