package org.apache.hadoop.mapreduce;

import com.sun.tools.doclets.internal.toolkit.taglets.SimpleTaglet;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.StringTokenizer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MiniMRClientCluster;
import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
import org.apache.hadoop.mapreduce.security.SpillCallBackPathsFinder;
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:lib/hadoop-mapreduce-client-jobclient-2.10.2-tests.jar:org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.class */
public class TestMRIntermediateDataEncryption {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) TestMRIntermediateDataEncryption.class);
    public static final long TOTAL_MBS_DEFAULT = 128;
    public static final long BLOCK_SIZE_DEFAULT = 33554432;
    public static final int INPUT_GEN_NUM_THREADS = 16;
    public static final long TASK_SORT_IO_MB_DEFAULT = 128;
    public static final String JOB_DIR_PATH = "jobs-data-path";
    private static File testRootDir;
    private static volatile BufferedWriter inputBufferedWriter;
    private static Configuration commonConfig;
    private static MiniDFSCluster dfsCluster;
    private static MiniMRClientCluster mrCluster;
    private static FileSystem fs;
    private static FileChecksum checkSumReference;
    private static Path jobInputDirPath;
    private static long inputFileSize;
    private final String testTitleName;
    private final int numMappers;
    private final int numReducers;
    private final boolean isUber;
    private Configuration config;
    private Path jobOutputPath;

    /* loaded from: input_file:lib/hadoop-mapreduce-client-jobclient-2.10.2-tests.jar:org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption$CombinerJobMapper.class */
    public static class CombinerJobMapper extends Mapper<Object, Text, Text, LongWritable> {
        private final LongWritable sum = new LongWritable(0);
        private final Text word = new Text();

        @Override // org.apache.hadoop.mapreduce.Mapper
        public void map(Object obj, Text text, Mapper<Object, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
            String[] split = text.toString().split("\\s+");
            this.sum.set(Long.parseLong(split[1]));
            this.word.set(split[0]);
            context.write(this.word, this.sum);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/hadoop-mapreduce-client-jobclient-2.10.2-tests.jar:org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption$InputGeneratorTask.class */
    public static class InputGeneratorTask implements Callable<Long> {
        InputGeneratorTask() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Long call() throws Exception {
            long j = 0;
            ThreadLocalRandom current = ThreadLocalRandom.current();
            String lineSeparator = System.lineSeparator();
            BufferedWriter access$000 = TestMRIntermediateDataEncryption.access$000();
            while (j < 8388608) {
                access$000.write(RandomTextWriter.generateSentenceWithRand(current, current.nextInt(5, 20)).concat(lineSeparator));
                j += r0.length();
            }
            access$000.flush();
            TestMRIntermediateDataEncryption.LOG.info("Task {} finished. Wrote {} bytes.", Thread.currentThread().getName(), Long.valueOf(j));
            return Long.valueOf(j);
        }
    }

    /* loaded from: input_file:lib/hadoop-mapreduce-client-jobclient-2.10.2-tests.jar:org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption$TokenizerMapper.class */
    public static class TokenizerMapper extends Mapper<Object, Text, Text, LongWritable> {
        private static final LongWritable ONE = new LongWritable(1);
        private final Text word = new Text();

        @Override // org.apache.hadoop.mapreduce.Mapper
        public void map(Object obj, Text text, Mapper<Object, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer stringTokenizer = new StringTokenizer(text.toString());
            while (stringTokenizer.hasMoreTokens()) {
                this.word.set(stringTokenizer.nextToken());
                context.write(this.word, ONE);
            }
        }
    }

    public TestMRIntermediateDataEncryption(String str, int i, int i2, boolean z) {
        this.testTitleName = str;
        this.numMappers = i;
        this.numReducers = i2;
        this.isUber = z;
    }

    @Parameterized.Parameters(name = "{index}: TestMRIntermediateDataEncryption.{0} .. mappers:{1}, reducers:{2}, isUber:{3})")
    public static Collection<Object[]> getTestParameters() {
        return Arrays.asList(new Object[]{"testSingleReducer", 3, 1, false}, new Object[]{"testUberMode", 3, 1, true}, new Object[]{"testMultipleMapsPerNode", 8, 1, false}, new Object[]{"testMultipleReducers", 2, 4, false});
    }

    @BeforeClass
    public static void setupClass() throws Exception {
        testRootDir = GenericTestUtils.setupTestRootDir(TestMRIntermediateDataEncryption.class);
        File file = new File(testRootDir, "dfs");
        Path path = new Path(JOB_DIR_PATH);
        commonConfig = createBaseConfiguration();
        dfsCluster = new MiniDFSCluster.Builder(commonConfig, file).numDataNodes(2).build();
        dfsCluster.waitActive();
        mrCluster = MiniMRClientClusterFactory.create(TestMRIntermediateDataEncryption.class, 2, commonConfig);
        mrCluster.start();
        fs = dfsCluster.getFileSystem();
        if (fs.exists(path) && !fs.delete(path, true)) {
            throw new IOException("Could not delete JobsDirPath" + path);
        }
        fs.mkdirs(path);
        jobInputDirPath = new Path(path, "in-dir");
        Assert.assertEquals("Generating input should succeed", 0L, generateInputTextFile());
        runReferenceJob();
    }

    @AfterClass
    public static void tearDown() throws IOException {
        if (mrCluster != null) {
            mrCluster.stop();
        }
        if (dfsCluster != null) {
            dfsCluster.shutdown();
        }
        File file = new File(testRootDir, "input.txt");
        if (file.exists()) {
            Assert.assertTrue(file.delete());
        }
    }

    private static Configuration createBaseConfiguration() {
        Configuration localDirectoriesConfigForTesting = MRJobConfUtil.setLocalDirectoriesConfigForTesting(MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null), testRootDir);
        localDirectoriesConfigForTesting.setLong("dfs.blocksize", 33554432L);
        return localDirectoriesConfigForTesting;
    }

    private static synchronized BufferedWriter getTextInputWriter() throws IOException {
        if (inputBufferedWriter == null) {
            inputBufferedWriter = new BufferedWriter(new FileWriter(new File(testRootDir, "input.txt")));
        }
        return inputBufferedWriter;
    }

    private static int generateInputTextFile() throws Exception {
        File file = new File(testRootDir, "input.txt");
        AtomicLong atomicLong = new AtomicLong(0L);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(16);
        ArrayList arrayList = new ArrayList();
        InputGeneratorTask inputGeneratorTask = new InputGeneratorTask();
        long monotonicNow = Time.monotonicNow();
        for (int i = 0; i < 16; i++) {
            arrayList.add(newFixedThreadPool.submit(inputGeneratorTask));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            LOG.info("Received one task. Current total bytes: {}", Long.valueOf(atomicLong.addAndGet(((Long) ((Future) it.next()).get()).longValue())));
        }
        getTextInputWriter().close();
        LOG.info("Finished generating input. Wrote {} bytes in {} seconds", Long.valueOf(atomicLong.get()), Double.valueOf(((Time.monotonicNow() - monotonicNow) * 1.0d) / 1000.0d));
        newFixedThreadPool.shutdown();
        fs.mkdirs(jobInputDirPath);
        Path makeQualified = fs.makeQualified(new Path(jobInputDirPath, "input.txt"));
        fs.copyFromLocalFile(true, new Path(file.getAbsolutePath()), makeQualified);
        if (!fs.exists(makeQualified)) {
            return 1;
        }
        inputFileSize = fs.listStatus(makeQualified)[0].getLen();
        LOG.info("Text input file; path: {}, size: {}", makeQualified, Long.valueOf(inputFileSize));
        return 0;
    }

    private static void runReferenceJob() throws Exception {
        Path path = new Path(JOB_DIR_PATH, "job-reference");
        if (fs.exists(path) && !fs.delete(path, true)) {
            throw new IOException("Could not delete " + path);
        }
        Assert.assertTrue(fs.mkdirs(path));
        Path path2 = new Path(path, "out-dir");
        Configuration configuration = new Configuration(commonConfig);
        configuration.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, false);
        Assert.assertTrue(runWordCountJob("job-reference", path2, configuration, 4, 1).isSuccessful());
        FileStatus[] listStatus = fs.listStatus(path2, new Utils.OutputFileUtils.OutputFilesFilter());
        Assert.assertEquals(1L, listStatus.length);
        checkSumReference = fs.getFileChecksum(listStatus[0].getPath());
        Assert.assertTrue(fs.delete(path, true));
    }

    private static Job runWordCountJob(String str, Path path, Configuration configuration, int i, int i2) throws Exception {
        Job job = Job.getInstance(configuration);
        job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, i);
        job.setJarByClass(TestMRIntermediateDataEncryption.class);
        job.setJobName("mr-spill-" + str);
        job.setMapperClass(TokenizerMapper.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setCombinerClass(LongSumReducer.class);
        FileInputFormat.setMinInputSplitSize(job, (inputFileSize + i) / i);
        job.setReducerClass(LongSumReducer.class);
        job.setNumReduceTasks(i2);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job, jobInputDirPath);
        FileOutputFormat.setOutputPath(job, path);
        if (job.waitForCompletion(true)) {
            for (FileStatus fileStatus : fs.listStatus(path, new Utils.OutputFileUtils.OutputFilesFilter())) {
                LOG.info("Job: {} .. Output file {} .. Size = {}", str, fileStatus.getPath(), Long.valueOf(fileStatus.getLen()));
            }
        }
        return job;
    }

    private boolean validateJobOutput() throws Exception {
        Assert.assertTrue("Job Output path [" + this.jobOutputPath + "] should exist", fs.exists(this.jobOutputPath));
        Path path = this.jobOutputPath;
        if (this.numReducers != 1) {
            String str = this.testTitleName + "-combine";
            Path path2 = new Path(JOB_DIR_PATH, str);
            if (fs.exists(path2) && !fs.delete(path2, true)) {
                throw new IOException("Could not delete " + path2);
            }
            fs.mkdirs(path2);
            path = new Path(path2, "out-dir");
            Configuration configuration = new Configuration(commonConfig);
            configuration.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, false);
            Job job = Job.getInstance(configuration);
            job.setJarByClass(TestMRIntermediateDataEncryption.class);
            job.setJobName("mr-spill-" + str);
            job.setMapperClass(CombinerJobMapper.class);
            FileInputFormat.addInputPath(job, this.jobOutputPath);
            job.setReducerClass(LongSumReducer.class);
            job.setNumReduceTasks(1);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            FileOutputFormat.setOutputPath(job, path);
            if (!job.waitForCompletion(true)) {
                return false;
            }
            FileStatus[] listStatus = fs.listStatus(path, new Utils.OutputFileUtils.OutputFilesFilter());
            LOG.info("Job-Combination: {} .. Output file {} .. Size = {}", path2, listStatus[0].getPath(), Long.valueOf(listStatus[0].getLen()));
        }
        return checkSumReference.equals(fs.getFileChecksum(fs.listStatus(path, new Utils.OutputFileUtils.OutputFilesFilter())[0].getPath()));
    }

    @Before
    public void setup() throws Exception {
        LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", this.testTitleName);
        Path path = new Path(JOB_DIR_PATH, this.testTitleName);
        if (fs.exists(path) && !fs.delete(path, true)) {
            throw new IOException("Could not delete " + path);
        }
        fs.mkdirs(path);
        this.jobOutputPath = new Path(path, "out-dir");
        this.config = new Configuration(commonConfig);
        this.config.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, this.isUber);
        this.config.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
        this.config.setLong(MRJobConfig.IO_SORT_MB, 128L);
        long max = Math.max(2 * 128, this.config.getInt(MRJobConfig.MAP_MEMORY_MB, 1024));
        this.config.setLong(MRJobConfig.MAP_MEMORY_MB, max);
        this.config.set("mapreduce.map.java.opts", "-Xmx" + (max - 200) + SimpleTaglet.METHOD);
        this.config.setInt(MRJobConfig.NUM_MAPS, this.numMappers);
        this.config.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
        this.config.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
    }

    @Test
    public void testWordCount() throws Exception {
        LOG.info("........Starting main Job Driver #{} starting at {}.......", this.testTitleName, Time.formatTime(System.currentTimeMillis()));
        SpillCallBackPathsFinder spillCallBackPathsFinder = (SpillCallBackPathsFinder) IntermediateEncryptedStream.setSpillCBInjector(new SpillCallBackPathsFinder());
        StringBuilder sb = new StringBuilder(String.format("%n ===== test %s summary ======", this.testTitleName));
        try {
            long monotonicNow = Time.monotonicNow();
            sb.append(String.format("%nJob %s started at %s", this.testTitleName, Time.formatTime(System.currentTimeMillis())));
            Job runWordCountJob = runWordCountJob(this.testTitleName, this.jobOutputPath, this.config, this.numMappers, this.numReducers);
            Assert.assertTrue(runWordCountJob.isSuccessful());
            long monotonicNow2 = Time.monotonicNow();
            sb.append(String.format("%nJob %s ended at %s", runWordCountJob.getJobName(), Time.formatTime(System.currentTimeMillis())));
            sb.append(String.format("%n\tThe job took %.3f seconds", Double.valueOf((1.0d * (monotonicNow2 - monotonicNow)) / 1000.0d)));
            for (FileStatus fileStatus : fs.listStatus(this.jobOutputPath, new Utils.OutputFileUtils.OutputFilesFilter())) {
                sb.append(String.format("%n\tOutput file %s: %d", fileStatus.getPath(), Long.valueOf(fileStatus.getLen())));
            }
            Assert.assertTrue(validateJobOutput());
            Assert.assertTrue("Spill records must be greater than 0", runWordCountJob.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue() > 0);
            Assert.assertFalse("The encrypted spilled files should not be empty.", spillCallBackPathsFinder.getEncryptedSpilledFiles().isEmpty());
            Assert.assertTrue("Invalid access to spill file positions", spillCallBackPathsFinder.getInvalidSpillEntries().isEmpty());
            sb.append(spillCallBackPathsFinder.getSpilledFileReport());
            LOG.info(sb.toString());
            IntermediateEncryptedStream.resetSpillCBInjector();
        } catch (Throwable th) {
            sb.append(spillCallBackPathsFinder.getSpilledFileReport());
            LOG.info(sb.toString());
            IntermediateEncryptedStream.resetSpillCBInjector();
            throw th;
        }
    }

    static /* synthetic */ BufferedWriter access$000() throws IOException {
        return getTextInputWriter();
    }
}
