package com.aliyun.odps.mapred.unittest;

import com.aliyun.odps.Column;
import com.aliyun.odps.counter.Counter;
import com.aliyun.odps.counter.CounterGroup;
import com.aliyun.odps.counter.Counters;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.local.common.FileSplit;
import com.aliyun.odps.local.common.TableMeta;
import com.aliyun.odps.local.common.WareHouse;
import com.aliyun.odps.local.common.utils.ArchiveUtils;
import com.aliyun.odps.local.common.utils.LocalRunUtils;
import com.aliyun.odps.mapred.TaskId;
import com.aliyun.odps.mapred.bridge.WritableRecord;
import com.aliyun.odps.mapred.conf.BridgeJobConf;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.local.CSVRecordReader;
import com.aliyun.odps.mapred.local.CSVRecordWriter;
import com.aliyun.odps.mapred.local.JobCounter;
import com.aliyun.odps.mapred.local.LocalTaskId;
import com.aliyun.odps.mapred.local.MapDriver;
import com.aliyun.odps.mapred.local.ReduceDriver;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import com.aliyun.odps.pipeline.Pipeline;
import com.aliyun.odps.utils.StringUtils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/aliyun/odps/mapred/unittest/MRUnitTest.class */
public class MRUnitTest {
    private static final Log LOG = LogFactory.getLog(MRUnitTest.class);
    private static String DEFAULT_PROJECT_NAME = "default_mr_unittest";
    private static String DEFAULT_TABLE_NAME = "default_mr_unittest_table";

    public TaskOutput runMapper(JobConf jobConf, MapUTContext mapUTContext) throws IOException, ClassNotFoundException, InterruptedException {
        Pipeline fromJobConf = Pipeline.fromJobConf(jobConf);
        String generateMrTaskName = generateMrTaskName();
        LOG.info("start to run mapper unittest, id: " + generateMrTaskName);
        RuntimeContext create = RuntimeContext.create(generateMrTaskName, jobConf);
        TableInfo[] tables = InputUtils.getTables(jobConf);
        ArrayList arrayList = new ArrayList();
        mapUTContext.setRuntimeContext(create);
        writeJobConf(jobConf, create);
        processInputs(jobConf, arrayList, mapUTContext);
        processResources(jobConf, mapUTContext);
        int i = (tables == null || tables.length <= 0) ? jobConf.getInt("odps.stage.mapper.num", 1) : tables.length;
        int computeReduceNum = computeReduceNum(i, fromJobConf == null ? null : fromJobConf.getFirstNode(), jobConf, create);
        TaskOutput taskOutput = fromJobConf == null ? new TaskOutput(jobConf, computeReduceNum) : new TaskOutput(jobConf, fromJobConf, new LocalTaskId("M1", 0, DEFAULT_PROJECT_NAME).getTaskId(), computeReduceNum);
        Counters counters = new Counters();
        for (int i2 = 0; i2 < i; i2++) {
            FileSplit fileSplit = arrayList.size() > 0 ? arrayList.get(i2) : FileSplit.NullSplit;
            TaskId taskId = new TaskId("M1", i2 + 1);
            LOG.info("Start to run mapper, TaskId: " + taskId);
            new MapDriver(new BridgeJobConf(jobConf), fileSplit, taskId, taskOutput, counters, tables == null ? null : tables[i2]).run();
        }
        Counters counters2 = new Counters();
        printCounters(counters, counters2);
        taskOutput.setCounters(counters2);
        clean(mapUTContext);
        return taskOutput;
    }

    private void processResources(JobConf jobConf, UTContext uTContext) throws IOException {
        RuntimeContext runtimeContext = uTContext.getRuntimeContext();
        for (Map.Entry<String, byte[]> entry : uTContext.getFileResources().entrySet()) {
            LOG.info("process file resource: " + entry.getKey());
            FileUtils.writeByteArrayToFile(new File(runtimeContext.getResourceDir(), entry.getKey()), entry.getValue());
        }
        for (Map.Entry<String, File> entry2 : uTContext.getArchiveResources().entrySet()) {
            LOG.info("process archive resource: " + entry2.getKey());
            File file = new File(runtimeContext.getResourceDir(), entry2.getKey() + "_decompressed");
            File file2 = new File(runtimeContext.getResourceDir(), entry2.getKey());
            File value = entry2.getValue();
            if (value.isFile()) {
                FileUtils.copyFile(value, file2);
                ArchiveUtils.unArchive(file2, file);
            } else {
                file2.createNewFile();
                FileUtils.copyDirectoryToDirectory(value, file);
            }
        }
        Map<String, List<Record>> tableResources = uTContext.getTableResources();
        Map<String, TableMeta> tableMetas = uTContext.getTableMetas();
        for (Map.Entry<String, List<Record>> entry3 : tableResources.entrySet()) {
            LOG.info("process table resource: " + entry3.getKey());
            writeRecords(new File(runtimeContext.getResourceDir(), entry3.getKey()), entry3.getValue(), tableMetas.get(entry3.getKey()));
        }
        uTContext.clearResources();
    }

    private String generateMrTaskName() {
        return "mr_ut_" + LocalRunUtils.getDateFormat("yyyyMMddHHmmss_SSS").format(new Date());
    }

    private void processInputs(JobConf jobConf, List<FileSplit> list, MapUTContext mapUTContext) throws IOException {
        TableInfo[] tables = InputUtils.getTables(jobConf);
        if (tables == null) {
            LOG.debug("No input tables to process");
            return;
        }
        for (TableInfo tableInfo : tables) {
            LOG.debug("Start to process input table: " + tableInfo);
            if (StringUtils.isEmpty(tableInfo.getProjectName())) {
                tableInfo.setProjectName(DEFAULT_PROJECT_NAME);
            }
            processMapInputs(jobConf, list, mapUTContext, tableInfo);
            LOG.debug("Finished process input table: " + tableInfo);
        }
        if (list.isEmpty()) {
            list.add(FileSplit.NullSplit);
        }
    }

    public TaskOutput runReducer(JobConf jobConf, ReduceUTContext reduceUTContext) throws IOException, ClassNotFoundException, InterruptedException {
        Pipeline fromJobConf = Pipeline.fromJobConf(jobConf);
        String generateMrTaskName = generateMrTaskName();
        LOG.info("start to run mapper unittest, id: " + generateMrTaskName);
        RuntimeContext create = RuntimeContext.create(generateMrTaskName, jobConf);
        reduceUTContext.setRuntimeContext(create);
        writeJobConf(jobConf, create);
        Pipeline.TransformNode node = fromJobConf == null ? null : fromJobConf.getNode(reduceUTContext.getReducerIndex() + 1);
        int reducerIndex = fromJobConf == null ? 2 : 2 + reduceUTContext.getReducerIndex();
        int computeReduceNum = computeReduceNum(1, node, jobConf, create);
        TaskOutput taskOutput = fromJobConf == null ? new TaskOutput(jobConf, computeReduceNum) : new TaskOutput(jobConf, fromJobConf, new LocalTaskId(reducerIndex == 2 ? "M1" : "R" + Integer.toString(reducerIndex - 1), 0, DEFAULT_PROJECT_NAME).getTaskId(), computeReduceNum);
        for (KeyValue<Record, Record> keyValue : reduceUTContext.getInputKeyVals()) {
            taskOutput.add(keyValue.getKey(), keyValue.getValue());
        }
        processResources(jobConf, reduceUTContext);
        TaskOutput taskOutput2 = fromJobConf == null ? new TaskOutput(jobConf, computeReduceNum) : new TaskOutput(jobConf, fromJobConf, new LocalTaskId("R" + Integer.toString(reducerIndex), 0, DEFAULT_PROJECT_NAME).getTaskId(), computeReduceNum);
        Counters counters = new Counters();
        for (int i = 0; i < computeReduceNum; i++) {
            TaskId taskId = new TaskId("R" + Integer.toString(reducerIndex), i);
            LOG.info("Start to run reduce, taskId: " + taskId);
            new ReduceDriver(new BridgeJobConf(jobConf), taskOutput, taskOutput2, taskId, counters, i).run();
            LOG.info("Finished run reduce, taskId: " + taskId);
        }
        Counters counters2 = new Counters();
        printCounters(counters, counters2);
        taskOutput2.setCounters(counters2);
        clean(reduceUTContext);
        return taskOutput2;
    }

    public static Record createRecord(String str) throws IOException {
        return new WritableRecord(SchemaUtils.fromString(str));
    }

    public static List<Record> readRecords(File file) throws IOException {
        ArrayList arrayList = new ArrayList();
        TableMeta readSchema = com.aliyun.odps.local.common.utils.SchemaUtils.readSchema(file);
        File file2 = new File(file, "data");
        if (!file2.exists()) {
            return arrayList;
        }
        Counters counters = new Counters();
        Counter findCounter = counters.findCounter(JobCounter.__EMPTY_WILL_NOT_SHOW);
        CSVRecordReader cSVRecordReader = new CSVRecordReader(new FileSplit(file2, readSchema.getCols(), 0L, file2.getTotalSpace()), readSchema, findCounter, findCounter, counters, WareHouse.getInstance().getInputColumnSeperator());
        Record read = cSVRecordReader.read();
        while (true) {
            Record record = read;
            if (record == null) {
                cSVRecordReader.close();
                return arrayList;
            }
            arrayList.add(record.clone());
            read = cSVRecordReader.read();
        }
    }

    public static void writeRecords(File file, List<Record> list, TableMeta tableMeta) throws IOException {
        if (StringUtils.isEmpty(tableMeta.getProjName())) {
            tableMeta.setProjName(DEFAULT_PROJECT_NAME);
        }
        if (StringUtils.isEmpty(tableMeta.getTableName())) {
            tableMeta.setTableName(DEFAULT_TABLE_NAME);
        }
        file.mkdirs();
        com.aliyun.odps.local.common.utils.SchemaUtils.generateSchemaFile(tableMeta, (List) null, file);
        File file2 = new File(file, "data");
        file2.createNewFile();
        CSVRecordWriter cSVRecordWriter = new CSVRecordWriter(file2, null, null, RuntimeContext.getWareHouse().getInputColumnSeperator());
        Iterator<Record> it = list.iterator();
        while (it.hasNext()) {
            cSVRecordWriter.write(it.next());
        }
        cSVRecordWriter.close();
    }

    public static boolean equalRecords(List<Record> list, List<Record> list2, boolean z) {
        ArrayList arrayList = new ArrayList(list);
        ArrayList arrayList2 = new ArrayList(list2);
        LocalRecordComparator localRecordComparator = new LocalRecordComparator();
        if (z) {
            Collections.sort(arrayList, localRecordComparator);
            Collections.sort(arrayList2, localRecordComparator);
        }
        boolean z2 = arrayList.size() == list2.size();
        for (int i = 0; i < arrayList.size() && z2; i++) {
            z2 = localRecordComparator.compare((Record) arrayList.get(i), (Record) arrayList2.get(i)) == 0;
        }
        return z2;
    }

    public static boolean equalRecords(File file, List<Record> list, boolean z) throws IOException {
        return equalRecords(readRecords(file), list, z);
    }

    private void processMapInputs(JobConf jobConf, List<FileSplit> list, MapUTContext mapUTContext, TableInfo tableInfo) throws IOException {
        LOG.info("process map input: " + tableInfo);
        RuntimeContext runtimeContext = mapUTContext.getRuntimeContext();
        String projectName = tableInfo.getProjectName();
        String tableName = tableInfo.getTableName();
        String str = projectName + "." + tableName;
        String partPath = tableInfo.getPartPath();
        String inputSchema = mapUTContext.getInputSchema();
        if (inputSchema == null) {
            throw new IOException("input schema is not set.");
        }
        prepareTableDir(new TableMeta(projectName, tableName, SchemaUtils.fromString(inputSchema.trim()), (Column[]) null), list, runtimeContext.getInputDir(str, partPath), mapUTContext.getInputRecords());
    }

    private void prepareTableDir(TableMeta tableMeta, List<FileSplit> list, File file, List<Record> list2) throws IOException {
        LOG.info("prepare table dir: " + (tableMeta.getProjName() + "." + tableMeta.getTableName()) + " to " + file.getAbsolutePath());
        writeRecords(file, list2, tableMeta);
        File file2 = new File(file, "data");
        list.add(new FileSplit(file2, tableMeta.getCols(), 0L, file2.length()));
    }

    private void clean(UTContext uTContext) throws IOException {
        if (uTContext.isCleanUtDir()) {
            FileUtils.deleteDirectory(uTContext.runtimeContext.getJobDir());
        }
    }

    private void writeJobConf(JobConf jobConf, RuntimeContext runtimeContext) throws IOException {
        FileOutputStream fileOutputStream = new FileOutputStream(runtimeContext.getJobFile());
        jobConf.writeXml(fileOutputStream);
        fileOutputStream.close();
    }

    private int computeReduceNum(int i, Pipeline.TransformNode transformNode, JobConf jobConf, RuntimeContext runtimeContext) throws IOException {
        int numTasks = transformNode != null ? transformNode.getNextNode() != null ? transformNode.getNextNode().getNumTasks() : transformNode.getNumTasks() : runtimeContext.containsKey("odps.stage.reducer.num") ? jobConf.getNumReduceTasks() : Math.max(1, i / 4);
        if (numTasks < 0) {
            throw new IOException("ODPS-0720251: Num of reduce instance is invalid - reduce num cann't be less than 0");
        }
        if (numTasks != jobConf.getNumReduceTasks()) {
            LOG.info("change reduce num from " + jobConf.getNumReduceTasks() + " to " + numTasks);
        }
        jobConf.setNumReduceTasks(numTasks);
        return numTasks;
    }

    private void printCounters(Counters counters, Counters counters2) {
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        Iterator it = counters.iterator();
        while (it.hasNext()) {
            CounterGroup counterGroup = (CounterGroup) it.next();
            Iterator it2 = counterGroup.iterator();
            while (it2.hasNext()) {
                if (!((Counter) it2.next()).getDisplayName().startsWith("__EMPTY_")) {
                    if (counterGroup.getDisplayName().equals(JobCounter.class.getName())) {
                        i2++;
                    } else if (counterGroup.getDisplayName().equals("com.aliyun.odps.mapred.local.Counter.JobCounter")) {
                        i3++;
                    } else {
                        i4++;
                    }
                    i++;
                }
            }
        }
        StringBuilder sb = new StringBuilder("Counters: " + i);
        sb.append("\n\tMap-Reduce Framework: " + i2);
        Iterator it3 = counters.iterator();
        while (it3.hasNext()) {
            CounterGroup counterGroup2 = (CounterGroup) it3.next();
            if (counterGroup2.getDisplayName().equals(JobCounter.class.getName())) {
                Iterator it4 = counterGroup2.iterator();
                while (it4.hasNext()) {
                    Counter counter = (Counter) it4.next();
                    if (!counter.getDisplayName().startsWith("__EMPTY_")) {
                        sb.append("\n\t\t" + counter.getDisplayName() + "=" + counter.getValue());
                    }
                }
            }
        }
        sb.append("\n\tUser Defined Counters: " + i4);
        Iterator it5 = counters.iterator();
        while (it5.hasNext()) {
            CounterGroup counterGroup3 = (CounterGroup) it5.next();
            if (!counterGroup3.getDisplayName().equals(JobCounter.class.getName()) && !counterGroup3.getDisplayName().equals("com.aliyun.odps.mapred.local.Counter.JobCounter")) {
                sb.append("\n\t\t" + counterGroup3.getDisplayName());
                Iterator it6 = counterGroup3.iterator();
                while (it6.hasNext()) {
                    Counter counter2 = (Counter) it6.next();
                    if (!counter2.getDisplayName().equals(JobCounter.__EMPTY_WILL_NOT_SHOW.toString())) {
                        counters2.findCounter(counterGroup3.getDisplayName(), counter2.getDisplayName()).setValue(counter2.getValue());
                        sb.append("\n\t\t\t" + counter2.getDisplayName() + "=" + counter2.getValue());
                    }
                }
            }
        }
        System.err.println(sb.toString());
    }
}
