package com.aliyun.odps.graph.local.master;

import com.aliyun.odps.counter.Counter;
import com.aliyun.odps.counter.CounterGroup;
import com.aliyun.odps.counter.Counters;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.Aggregator;
import com.aliyun.odps.graph.JobConf;
import com.aliyun.odps.graph.Partitioner;
import com.aliyun.odps.graph.VertexResolver;
import com.aliyun.odps.graph.local.COUNTER;
import com.aliyun.odps.graph.local.GraphTaskAttemptID;
import com.aliyun.odps.graph.local.InputSplit;
import com.aliyun.odps.graph.local.LocalVertexMutations;
import com.aliyun.odps.graph.local.RuntimeContext;
import com.aliyun.odps.graph.local.utils.LocalGraphRunUtils;
import com.aliyun.odps.graph.local.worker.Worker;
import com.aliyun.odps.graph.utils.VerifyUtils;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.io.WritableComparable;
import com.aliyun.odps.local.common.FileSplit;
import com.aliyun.odps.utils.ReflectionUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/aliyun/odps/graph/local/master/Master.class */
public class Master<VERTEX_ID extends WritableComparable<?>, VERTEX_VALUE extends Writable, EDGE_VALUE extends Writable, MESSAGE extends Writable, AGGR_INFO extends Writable, VALUE extends Writable> {
    private static Log LOG = LogFactory.getLog(Master.class);
    JobConf mJob;
    RuntimeContext mCtx;
    List<FileSplit> mInputs;
    Map<String, TableInfo> mOutputs;
    private Partitioner<VERTEX_ID> p;
    int maxIteration;
    private Counters mCounters;
    int superStep = -1;
    List<Worker> mWorkers = new ArrayList();
    int totalVertex = 0;
    int totalEdge = 0;
    private List<AGGR_INFO> mLastAggregatorValues = new ArrayList();

    public Master(JobConf jobConf, RuntimeContext runtimeContext, List<FileSplit> list, Map<String, TableInfo> map) throws Exception {
        this.mJob = jobConf;
        this.mCtx = runtimeContext;
        this.mInputs = list;
        this.mOutputs = map;
        this.mCounters = runtimeContext.getCounters();
        init();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v37, types: [com.aliyun.odps.local.common.FileSplit] */
    private void init() throws InstantiationException, IllegalAccessException, IOException, ClassNotFoundException, NoSuchFieldException {
        VerifyUtils.verifyGraphConf(this.mJob);
        ReflectionUtils.setField(this.mJob, "state", JobConf.JobState.RUNNING);
        this.maxIteration = this.mJob.getMaxIteration();
        int numWorkers = this.mJob.getNumWorkers();
        int size = this.mInputs.size() < numWorkers ? numWorkers : this.mInputs.size();
        LOG.info("worker num :" + size);
        this.p = LocalGraphRunUtils.createPartitioner(this.mJob);
        for (int i = 0; i < size; i++) {
            InputSplit inputSplit = i < this.mInputs.size() ? this.mInputs.get(i) : null;
            this.mWorkers.add(new Worker(this.mJob, this.mCtx, this, new GraphTaskAttemptID(this.mCtx.getJobId(), i, 0), i, size, (inputSplit == null || inputSplit == FileSplit.NullSplit) ? InputSplit.NullSplit : inputSplit, this.mOutputs));
        }
        initCounters();
    }

    private void initCounters() {
        this.mCounters.findCounter(COUNTER.TASK_INPUT_BYTE);
        this.mCounters.findCounter(COUNTER.TASK_INPUT_RECORD);
        this.mCounters.findCounter(COUNTER.TASK_OUTPUT_BYTE);
        this.mCounters.findCounter(COUNTER.TASK_OUTPUT_RECORD);
    }

    private void initGraph() throws IOException {
        Iterator<Worker> it = this.mWorkers.iterator();
        while (it.hasNext()) {
            it.next().loadGraph();
        }
        processMutations(LocalGraphRunUtils.createLoadingVertexResolver(this.mJob));
        if (this.totalVertex == 0) {
            throw new IOException("ODPS-0730001: No vertices in the graph, exiting.");
        }
    }

    private void processMutations(VertexResolver vertexResolver) throws IOException {
        Iterator<Worker> it = this.mWorkers.iterator();
        while (it.hasNext()) {
            it.next().processWorkerMutations(vertexResolver);
        }
        this.totalVertex = 0;
        this.totalEdge = 0;
        for (Worker worker : this.mWorkers) {
            this.totalVertex = (int) (this.totalVertex + worker.getVertexNumber());
            this.totalEdge = (int) (this.totalEdge + worker.getEgeNumber());
        }
        Iterator<Worker> it2 = this.mWorkers.iterator();
        while (it2.hasNext()) {
            it2.next().setTotalNumVerticesAndEdges(this.totalVertex, this.totalEdge);
        }
    }

    private void WorkerSetup() throws IOException {
        initGraph();
        Iterator<Worker> it = this.mWorkers.iterator();
        while (it.hasNext()) {
            it.next().init();
        }
    }

    private boolean Aggregate() throws IOException {
        List<Aggregator> aggregator = LocalGraphRunUtils.getAggregator(this.mJob);
        if (aggregator.size() <= 0) {
            return false;
        }
        boolean z = false;
        List<Writable> list = null;
        for (int i = 0; i < this.mWorkers.size(); i++) {
            List<Writable> partialAggregate = this.mWorkers.get(i).partialAggregate();
            if (i == 0) {
                list = partialAggregate;
            } else {
                for (int i2 = 0; i2 < partialAggregate.size(); i2++) {
                    aggregator.get(i2).merge(list.get(i2), partialAggregate.get(i2));
                }
            }
        }
        for (int i3 = 0; i3 < list.size(); i3++) {
            z |= aggregator.get(i3).terminate(this.mWorkers.get(0).getTaskContext(), list.get(i3));
        }
        this.mLastAggregatorValues = list;
        for (int i4 = 0; i4 < this.mWorkers.size(); i4++) {
            this.mWorkers.get(i4).setLastAggregatedValue(this.mLastAggregatorValues);
        }
        return z;
    }

    private void close() throws IOException {
        Iterator<Worker> it = this.mWorkers.iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().getCounters().iterator();
            while (it2.hasNext()) {
                CounterGroup counterGroup = (CounterGroup) it2.next();
                Iterator it3 = counterGroup.iterator();
                while (it3.hasNext()) {
                    Counter counter = (Counter) it3.next();
                    this.mCounters.findCounter(counterGroup.getName(), counter.getDisplayName()).increment(counter.getValue());
                }
            }
        }
        FileUtils.writeStringToFile(new File(this.mCtx.getCounterDir(), this.mCtx.getJobId()), this.mCounters.toString());
        LOG.debug(this.mCounters.toString());
        System.out.println(this.mCounters.toString());
    }

    public void run() throws IOException {
        WorkerSetup();
        do {
            if (this.maxIteration >= 0 && this.superStep + 1 >= this.maxIteration) {
                break;
            }
            boolean z = true;
            Iterator<Worker> it = this.mWorkers.iterator();
            while (it.hasNext()) {
                z = z && it.next().allVertexVoltHalt();
            }
            if (z) {
                break;
            }
            LOG.debug("master super step " + (this.superStep + 1));
            processMutations(LocalGraphRunUtils.createSuperstepVertexResolver(this.mJob));
            this.superStep++;
            Iterator<Worker> it2 = this.mWorkers.iterator();
            while (it2.hasNext()) {
                it2.next().processNextStep();
            }
            Iterator<Worker> it3 = this.mWorkers.iterator();
            while (it3.hasNext()) {
                it3.next().Compute();
            }
        } while (!Aggregate());
        Iterator<Worker> it4 = this.mWorkers.iterator();
        while (it4.hasNext()) {
            it4.next().cleanup();
        }
        Iterator<Worker> it5 = this.mWorkers.iterator();
        while (it5.hasNext()) {
            it5.next().close();
        }
        close();
    }

    private Worker getWorkerByVertexID(VERTEX_ID vertex_id) {
        return this.mWorkers.get(this.p.getPartition(vertex_id, this.mWorkers.size()));
    }

    public long getSuperStep() {
        return this.superStep;
    }

    public LocalVertexMutations getVertexMutations(VERTEX_ID vertex_id) {
        return getWorkerByVertexID(vertex_id).getVertexMutations(vertex_id);
    }

    public void pushMsg(RuntimeContext runtimeContext, long j, VERTEX_ID vertex_id, Writable writable) {
        getWorkerByVertexID(vertex_id).pushMsg(runtimeContext, j, vertex_id, writable);
    }
}
