package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.pinot.common.config.ColumnPartitionConfig;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.partition.PartitionFunction;
import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.hadoop.job.SegmentCreationJob;
import org.apache.pinot.hadoop.job.SegmentPreprocessingJob;
import org.apache.pinot.hadoop.job.SegmentTarPushJob;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.class */
public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet {
    private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentBuildPushOfflineClusterIntegrationTest.class);
    private static final int NUM_BROKERS = 1;
    private static final int NUM_SERVERS = 1;
    private MiniMRYarnCluster _mrCluster;
    private Schema _schema;

    protected int getNumBrokers() {
        return 1;
    }

    protected int getNumServers() {
        return 1;
    }

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._avroDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBrokers(getNumBrokers());
        startServers(getNumServers());
        Configuration configuration = new Configuration();
        this._mrCluster = new MiniMRYarnCluster(getClass().getName(), 2);
        this._mrCluster.init(configuration);
        this._mrCluster.start();
        this._schema = Schema.fromFile(getSchemaFile());
        List<File> unpackAvroData = unpackAvroData(this._avroDir);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        setUpH2Connection(unpackAvroData, newCachedThreadPool);
        setUpQueryGenerator(unpackAvroData, newCachedThreadPool);
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(10L, TimeUnit.MINUTES);
        addSchema(getSchemaFile(), this._schema.getSchemaName());
        addOfflineTable(getTableName(), this._schema.getTimeColumnName(), this._schema.getOutgoingTimeUnit().toString(), null, null, getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), getSegmentPartitionConfig(), getSortedColumn());
        generateAndPushSegmentsFromHadoop();
        waitForAllDocsLoaded(600000L);
    }

    @AfterClass
    public void tearDown() throws Exception {
        stopServer();
        stopBroker();
        stopController();
        stopZk();
        this._mrCluster.stop();
        FileUtils.deleteDirectory(this._mrCluster.getTestWorkDir());
        FileUtils.deleteDirectory(this._tempDir);
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testQueriesFromQueryFile() throws Exception {
        super.testQueriesFromQueryFile();
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testSqlQueriesFromQueryFile() throws Exception {
        super.testQueriesFromQueryFile();
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testGeneratedQueriesWithMultiValues() throws Exception {
        super.testGeneratedQueriesWithMultiValues();
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testQueryExceptions() throws Exception {
        super.testQueryExceptions();
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testInstanceShutdown() throws Exception {
        super.testInstanceShutdown();
    }

    private void generateAndPushSegmentsFromHadoop() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("path.to.input", this._avroDir.getPath());
        properties.setProperty("path.to.output", this._segmentDir.getPath());
        properties.setProperty("segment.table.name", getTableName());
        properties.setProperty("push.to.hosts", getDefaultControllerConfiguration().getControllerHost());
        properties.setProperty("push.to.port", getDefaultControllerConfiguration().getControllerPort());
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.setProperty("path.to.input", this._avroDir.getPath());
        properties2.setProperty("enable.preprocessing", "true");
        properties2.setProperty("preprocess.path.to.output", this._preprocessingDir.getPath());
        SegmentPreprocessingJob segmentPreprocessingJob = new SegmentPreprocessingJob(properties2);
        Configuration config = this._mrCluster.getConfig();
        segmentPreprocessingJob.setConf(config);
        segmentPreprocessingJob.run();
        LOGGER.info("Segment preprocessing job finished.");
        verifyPreprocessingJob(config);
        SegmentCreationJob segmentCreationJob = new SegmentCreationJob(properties);
        segmentCreationJob.setConf(this._mrCluster.getConfig());
        segmentCreationJob.run();
        SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(properties);
        segmentTarPushJob.setConf(this._mrCluster.getConfig());
        segmentTarPushJob.run();
    }

    private void verifyPreprocessingJob(Configuration configuration) throws IOException {
        Map.Entry entry = (Map.Entry) getSegmentPartitionConfig().getColumnPartitionMap().entrySet().iterator().next();
        String str = (String) entry.getKey();
        String functionName = ((ColumnPartitionConfig) entry.getValue()).getFunctionName();
        int numPartitions = ((ColumnPartitionConfig) entry.getValue()).getNumPartitions();
        PartitionFunction partitionFunction = PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions);
        String sortedColumn = getSortedColumn();
        FileSystem fileSystem = FileSystem.get(configuration);
        FileStatus[] listStatus = fileSystem.listStatus(new Path(this._preprocessingDir.getPath()));
        Assert.assertEquals(listStatus.length, numPartitions, "Number of output file should be the same as the number of partitions.");
        HashSet hashSet = new HashSet();
        for (FileStatus fileStatus : listStatus) {
            DataFileStream dataFileStream = new DataFileStream(fileSystem.open(fileStatus.getPath()), new GenericDatumReader());
            hashSet.clear();
            Object obj = null;
            while (true) {
                Object obj2 = obj;
                if (dataFileStream.hasNext()) {
                    GenericRecord genericRecord = (GenericRecord) dataFileStream.next();
                    hashSet.add(Integer.valueOf(partitionFunction.getPartition(genericRecord.get(str))));
                    Assert.assertEquals(hashSet.size(), 1, "Partition Id should be the same within a file.");
                    org.apache.avro.Schema schema = genericRecord.getSchema().getField(sortedColumn).schema();
                    Object obj3 = genericRecord.get(sortedColumn);
                    if (obj2 == null) {
                        obj = obj3;
                    } else {
                        Assert.assertTrue(GenericData.get().compare(obj2, obj3, schema) <= 0);
                        obj = obj3;
                    }
                }
            }
        }
    }
}
