package com.alibaba.jstorm.daemon.worker;

import backtype.storm.Config;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb;
import com.alibaba.jstorm.daemon.worker.hearbeat.WorkerHeartbeatRunable;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskShutdownDameon;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/worker/Worker.class */
public class Worker {
    private static Logger LOG = LoggerFactory.getLogger(Worker.class);
    private WorkerData workerData;

    public Worker(Map map, IContext iContext, String str, String str2, int i, String str3, String str4) throws Exception {
        this.workerData = new WorkerData(map, iContext, str, str2, i, str3, str4);
    }

    public static Set<Integer> worker_output_tasks(WorkerData workerData) {
        ContextMaker contextMaker = workerData.getContextMaker();
        Set<Integer> taskids = workerData.getTaskids();
        StormTopology sysTopology = workerData.getSysTopology();
        HashSet hashSet = new HashSet();
        Iterator<Integer> it = taskids.iterator();
        while (it.hasNext()) {
            TopologyContext makeTopologyContext = contextMaker.makeTopologyContext(sysTopology, it.next(), null);
            Iterator<Map<String, Grouping>> it2 = makeTopologyContext.getThisTargets().values().iterator();
            while (it2.hasNext()) {
                Iterator<String> it3 = it2.next().keySet().iterator();
                while (it3.hasNext()) {
                    hashSet.addAll(makeTopologyContext.getComponentTasks(it3.next()));
                }
            }
        }
        return hashSet;
    }

    private RefreshConnections makeRefreshConnections() {
        Set<Integer> worker_output_tasks = worker_output_tasks(this.workerData);
        this.workerData.initOutboundTaskStatus(worker_output_tasks);
        this.workerData.setOutboundTasks(worker_output_tasks);
        return new RefreshConnections(this.workerData);
    }

    private List<TaskShutdownDameon> createTasks() throws Exception {
        ArrayList arrayList = new ArrayList();
        Set<Integer> taskids = this.workerData.getTaskids();
        HashSet hashSet = new HashSet();
        ArrayList arrayList2 = new ArrayList();
        Iterator<Integer> it = taskids.iterator();
        while (it.hasNext()) {
            Task task = new Task(this.workerData, it.next().intValue());
            Thread thread = new Thread(task);
            hashSet.add(thread);
            arrayList2.add(task);
            thread.start();
        }
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            ((Thread) it2.next()).join();
        }
        Iterator it3 = arrayList2.iterator();
        while (it3.hasNext()) {
            arrayList.add(((Task) it3.next()).getTaskShutdownDameon());
        }
        return arrayList;
    }

    private AsyncLoopThread startDispatchThread() {
        IContext context = this.workerData.getContext();
        String topologyId = this.workerData.getTopologyId();
        Map<Object, Object> stormConf = this.workerData.getStormConf();
        DisruptorQueue mkInstance = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI, JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 256).intValue(), (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf));
        JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName(MetricDef.RECV_CTRL_QUEUE, MetricType.GAUGE), new AsmGauge(new QueueGauge(mkInstance, MetricDef.RECV_CTRL_QUEUE)));
        IConnection bind = context.bind(topologyId, this.workerData.getPort().intValue(), this.workerData.getDeserializeQueues(), mkInstance);
        this.workerData.setRecvConnection(bind);
        return new AsyncLoopThread(ConfigExtension.isTaskBatchTuple(stormConf).booleanValue() ? new VirtualPortBatchCtrlDispatch(this.workerData, bind, mkInstance, MetricDef.BATCH_RECV_THREAD) : new VirtualPortCtrlDispatch(this.workerData, bind, mkInstance, MetricDef.RECV_THREAD), false, 10, true);
    }

    public WorkerShutdown execute() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(startDispatchThread());
        arrayList.add(new AsyncLoopThread(makeRefreshConnections(), false, 1, true));
        arrayList.add(new AsyncLoopThread(new RefreshActive(this.workerData), false, 1, true));
        arrayList.add(new AsyncLoopThread(ConfigExtension.isTaskBatchTuple(this.workerData.getStormConf()).booleanValue() ? new DrainerBatchCtrlRunable(this.workerData, MetricDef.BATCH_SEND_THREAD) : new DrainerCtrlRunable(this.workerData, MetricDef.SEND_THREAD), false, 10, true));
        AsyncLoopThread mkWorkerInstance = SyncContainerHb.mkWorkerInstance(this.workerData.getStormConf());
        if (mkWorkerInstance != null) {
            arrayList.add(mkWorkerInstance);
        }
        JStormMetricsReporter jStormMetricsReporter = new JStormMetricsReporter(this.workerData);
        jStormMetricsReporter.init();
        this.workerData.setMetricsReporter(jStormMetricsReporter);
        arrayList.add(new AsyncLoopThread(new WorkerHeartbeatRunable(this.workerData), false, null, 5, true));
        this.workerData.setShutdownTasks(createTasks());
        return new WorkerShutdown(this.workerData, arrayList);
    }

    public static WorkerShutdown mk_worker(Map map, IContext iContext, String str, String str2, int i, String str3, String str4) throws Exception {
        StringBuilder sb = new StringBuilder();
        sb.append("topologyId:" + str + ", ");
        sb.append("port:" + i + ", ");
        sb.append("workerId:" + str3 + ", ");
        sb.append("jarPath:" + str4 + "\n");
        LOG.info("Begin to run worker:" + sb.toString());
        return new Worker(map, iContext, str, str2, i, str3, str4).execute();
    }

    public void redirectOutput() {
        if (System.getenv("REDIRECT") == null || !System.getenv("REDIRECT").equals("true")) {
            return;
        }
        String logFileName = JStormUtils.getLogFileName();
        String str = logFileName == null ? "/dev/null" : logFileName + ".out";
        String workerRedirectOutputFile = ConfigExtension.getWorkerRedirectOutputFile(this.workerData.getStormConf());
        if (workerRedirectOutputFile == null) {
            workerRedirectOutputFile = str;
        } else {
            try {
                File file = new File(workerRedirectOutputFile);
                if (!file.exists()) {
                    PathUtils.touch(workerRedirectOutputFile);
                } else if (file.isDirectory()) {
                    LOG.warn("Failed to write " + workerRedirectOutputFile);
                    workerRedirectOutputFile = str;
                } else if (!file.canWrite()) {
                    LOG.warn("Failed to write " + workerRedirectOutputFile);
                    workerRedirectOutputFile = str;
                }
            } catch (Exception e) {
                LOG.warn("Failed to touch " + workerRedirectOutputFile, e);
                workerRedirectOutputFile = str;
            }
        }
        try {
            JStormUtils.redirectOutput(workerRedirectOutputFile);
        } catch (Exception e2) {
            LOG.warn("Failed to redirect to " + workerRedirectOutputFile, e2);
        }
    }

    public static List<Integer> getOldPortPids(String str) {
        String process_pid = JStormUtils.process_pid();
        ArrayList arrayList = new ArrayList();
        StringBuilder sb = new StringBuilder();
        sb.append("ps -Af ");
        try {
            LOG.info("Begin to execute " + sb.toString());
            BufferedReader bufferedReader = new BufferedReader(new StringReader(JStormUtils.launchProcess(sb.toString(), new HashMap(), false)));
            JStormUtils.sleepMs(1000L);
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    return arrayList;
                }
                if (!StringUtils.isBlank(readLine) && readLine.contains(Worker.class.getName()) && readLine.contains(str)) {
                    LOG.info("Find :" + readLine);
                    String[] split = StringUtils.split(readLine);
                    boolean z = false;
                    int i = 0;
                    while (true) {
                        if (i >= split.length) {
                            break;
                        }
                        String str2 = split[i];
                        LOG.debug("Filed, " + i + ":" + str2);
                        if (!str2.contains(Worker.class.getName())) {
                            i++;
                        } else if (i + 3 >= split.length) {
                            LOG.info("Failed to find port ");
                        } else if (split[i + 3].equals(String.valueOf(str))) {
                            z = true;
                        }
                    }
                    if (!z) {
                        LOG.info("No old port worker");
                    } else if (split.length >= 2) {
                        try {
                            if (process_pid.equals(split[1])) {
                                LOG.info("Skip kill myself");
                            } else {
                                Integer valueOf = Integer.valueOf(split[1]);
                                LOG.info("Find one process :" + valueOf.toString());
                                arrayList.add(valueOf);
                            }
                        } catch (Exception e) {
                            LOG.error(e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (IOException e2) {
            LOG.info("Failed to execute " + sb.toString());
            return arrayList;
        } catch (Exception e3) {
            LOG.info(e3.getMessage(), e3);
            return arrayList;
        }
    }

    public static void killOldWorker(String str) {
        Iterator<Integer> it = getOldPortPids(str).iterator();
        while (it.hasNext()) {
            JStormUtils.kill(it.next());
        }
    }

    public static void main(String[] strArr) {
        StringBuilder sb = new StringBuilder();
        for (String str : strArr) {
            sb.append(str + " ");
        }
        LOG.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        LOG.info("Begin to start worker:" + sb.toString());
        LOG.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        if (strArr.length < 5) {
            LOG.error("The length of args is less than 5 ");
            System.exit(-1);
        }
        StringBuilder sb2 = new StringBuilder();
        try {
            String str2 = strArr[0];
            String str3 = strArr[1];
            String str4 = strArr[2];
            String str5 = strArr[3];
            String str6 = strArr[4];
            killOldWorker(str4);
            Map readStormConfig = Utils.readStormConfig();
            StormConfig.validate_distributed_mode(readStormConfig);
            JStormServerUtils.startTaobaoJvmMonitor();
            sb2.append("topologyId:" + str2 + ", ");
            sb2.append("port:" + str4 + ", ");
            sb2.append("workerId:" + str5 + ", ");
            sb2.append("jar_path:" + str6 + "\n");
            mk_worker(readStormConfig, null, str2, str3, Integer.parseInt(str4), str5, str6).join();
            LOG.info("@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
            LOG.info("Successfully shutdown worker " + sb2.toString());
            LOG.info("@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
        } catch (Throwable th) {
            String str7 = "Failed to create worker, " + sb2.toString();
            LOG.error(str7, th);
            JStormUtils.halt_process(-1, str7);
        }
    }
}
