package org.apache.hama.bsp;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
import org.apache.hama.pipes.PipesPartitioner;
import org.apache.hama.util.ZKUtil;

/* loaded from: input_file:org/apache/hama/bsp/PartitioningRunner.class */
public class PartitioningRunner extends BSP<Writable, Writable, Writable, Writable, NullWritable> {
    public static final Log LOG = LogFactory.getLog(PartitioningRunner.class);
    private Configuration conf;
    private int desiredNum;
    private Path partitionDir;
    private RecordConverter converter;
    private FileSystem fs = null;
    private PipesPartitioner<?, ?> pipesPartitioner = null;
    public Map<Integer, SequenceFile.Writer> writerCache = new HashMap();
    public Map<Integer, KeyValuePair<WritableComparable, MapWritable>> candidates = new HashMap();

    /* loaded from: input_file:org/apache/hama/bsp/PartitioningRunner$DefaultRecordConverter.class */
    public static class DefaultRecordConverter implements RecordConverter {
        @Override // org.apache.hama.bsp.PartitioningRunner.RecordConverter
        public KeyValuePair<Writable, Writable> convertRecord(KeyValuePair<Writable, Writable> keyValuePair, Configuration configuration) {
            return keyValuePair;
        }

        @Override // org.apache.hama.bsp.PartitioningRunner.RecordConverter
        public int getPartitionId(KeyValuePair<Writable, Writable> keyValuePair, Partitioner partitioner, Configuration configuration, BSPPeer bSPPeer, int i) {
            return Math.abs(partitioner.getPartition(keyValuePair.getKey(), keyValuePair.getValue(), i));
        }

        @Override // org.apache.hama.bsp.PartitioningRunner.RecordConverter
        public void setup(Configuration configuration) {
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/PartitioningRunner$RecordConverter.class */
    public interface RecordConverter {
        void setup(Configuration configuration);

        KeyValuePair<Writable, Writable> convertRecord(KeyValuePair<Writable, Writable> keyValuePair, Configuration configuration) throws IOException;

        int getPartitionId(KeyValuePair<Writable, Writable> keyValuePair, Partitioner partitioner, Configuration configuration, BSPPeer bSPPeer, int i);
    }

    @Override // org.apache.hama.bsp.BSP, org.apache.hama.bsp.BSPInterface
    public final void setup(BSPPeer<Writable, Writable, Writable, Writable, NullWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
        this.conf = bSPPeer.getConfiguration();
        this.desiredNum = this.conf.getInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, 1);
        this.fs = FileSystem.get(this.conf);
        this.converter = (RecordConverter) ReflectionUtils.newInstance(this.conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, DefaultRecordConverter.class, RecordConverter.class), this.conf);
        this.converter.setup(this.conf);
        if (this.conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) {
            this.partitionDir = new Path(this.conf.get("bsp.output.dir"));
        } else {
            this.partitionDir = new Path(this.conf.get(Constants.RUNTIME_PARTITIONING_DIR));
        }
    }

    @Override // org.apache.hama.bsp.BSP, org.apache.hama.bsp.BSPInterface
    public void bsp(BSPPeer<Writable, Writable, Writable, Writable, NullWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
        int numPeers = bSPPeer.getNumPeers();
        Partitioner partitioner = getPartitioner();
        KeyValuePair<Writable, Writable> keyValuePair = null;
        Class<?> cls = null;
        Class<?> cls2 = null;
        Class<?> cls3 = null;
        while (true) {
            KeyValuePair<Writable, Writable> readNext = bSPPeer.readNext();
            if (readNext == null) {
                Iterator<SequenceFile.Writer> it = this.writerCache.values().iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                bSPPeer.sync();
                FileStatus[] listStatus = this.fs.listStatus(this.partitionDir);
                bSPPeer.sync();
                for (FileStatus fileStatus : listStatus) {
                    int parseInt = Integer.parseInt(fileStatus.getPath().getName().split("[-]")[1]);
                    if (getMergeProcessorID(parseInt, numPeers) == bSPPeer.getPeerIndex()) {
                        Path path = new Path(this.partitionDir + ZKUtil.ZK_SEPARATOR + getPartitionName(parseInt));
                        FileStatus[] listStatus2 = this.fs.listStatus(fileStatus.getPath());
                        if ((keyValuePair.getKey() instanceof WritableComparable) && this.conf.getBoolean(Constants.PARTITION_SORT_BY_KEY, false)) {
                            mergeSortedFiles(listStatus2, path, cls, cls2, cls3);
                        } else {
                            mergeFiles(listStatus2, path, cls, cls2, cls3);
                        }
                        this.fs.delete(fileStatus.getPath(), true);
                    }
                }
                return;
            }
            if (cls2 == null && cls3 == null) {
                cls2 = ((Writable) readNext.getKey()).getClass();
                cls3 = ((Writable) readNext.getValue()).getClass();
            }
            keyValuePair = this.converter.convertRecord(readNext, this.conf);
            if (keyValuePair == null) {
                throw new IOException("The converted record can't be null.");
            }
            Writable writable = (Writable) keyValuePair.getKey();
            cls = writable.getClass();
            int partitionId = this.converter.getPartitionId(keyValuePair, partitioner, this.conf, bSPPeer, this.desiredNum);
            if (!this.writerCache.containsKey(Integer.valueOf(partitionId))) {
                this.writerCache.put(Integer.valueOf(partitionId), SequenceFile.createWriter(this.fs, this.conf, new Path(this.partitionDir + "/part-" + partitionId + "/file-" + bSPPeer.getPeerIndex()), cls, MapWritable.class, SequenceFile.CompressionType.NONE));
            }
            MapWritable mapWritable = new MapWritable();
            mapWritable.put((Writable) readNext.getKey(), (Writable) readNext.getValue());
            this.writerCache.get(Integer.valueOf(partitionId)).append(writable, mapWritable);
        }
    }

    private void mergeSortedFiles(FileStatus[] fileStatusArr, Path path, Class cls, Class cls2, Class cls3) throws IOException {
        SequenceFile.Writer createWriter = SequenceFile.createWriter(this.fs, this.conf, path, cls2, cls3, SequenceFile.CompressionType.NONE);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < fileStatusArr.length; i++) {
            SequenceFile.Sorter sorter = new SequenceFile.Sorter(this.fs, cls, MapWritable.class, this.conf);
            sorter.setMemory(this.conf.getInt("bsp.input.runtime.partitioning.sort.mb", 50) * 1024 * 1024);
            sorter.setFactor(this.conf.getInt("bsp.input.runtime.partitioning.sort.factor", 10));
            sorter.sort(fileStatusArr[i].getPath(), fileStatusArr[i].getPath().suffix(".sorted"));
            hashMap.put(Integer.valueOf(i), new SequenceFile.Reader(this.fs, fileStatusArr[i].getPath().suffix(".sorted"), this.conf));
        }
        for (int i2 = 0; i2 < hashMap.size(); i2++) {
            WritableComparable writableComparable = (WritableComparable) ReflectionUtils.newInstance(cls, this.conf);
            MapWritable mapWritable = new MapWritable();
            ((SequenceFile.Reader) hashMap.get(Integer.valueOf(i2))).next(writableComparable, mapWritable);
            this.candidates.put(Integer.valueOf(i2), new KeyValuePair<>(writableComparable, mapWritable));
        }
        while (hashMap.size() > 0) {
            WritableComparable writableComparable2 = (WritableComparable) ReflectionUtils.newInstance(cls, this.conf);
            MapWritable mapWritable2 = new MapWritable();
            int i3 = 0;
            WritableComparable writableComparable3 = null;
            MapWritable mapWritable3 = null;
            for (Map.Entry<Integer, KeyValuePair<WritableComparable, MapWritable>> entry : this.candidates.entrySet()) {
                if (writableComparable3 == null) {
                    i3 = entry.getKey().intValue();
                    writableComparable3 = (WritableComparable) entry.getValue().getKey();
                    mapWritable3 = (MapWritable) entry.getValue().getValue();
                } else {
                    WritableComparable writableComparable4 = (WritableComparable) entry.getValue().getKey();
                    if (writableComparable3.compareTo(writableComparable4) > 0) {
                        i3 = entry.getKey().intValue();
                        writableComparable3 = writableComparable4;
                        mapWritable3 = (MapWritable) entry.getValue().getValue();
                    }
                }
            }
            for (Map.Entry entry2 : mapWritable3.entrySet()) {
                createWriter.append((Writable) entry2.getKey(), (Writable) entry2.getValue());
            }
            this.candidates.remove(Integer.valueOf(i3));
            if (((SequenceFile.Reader) hashMap.get(Integer.valueOf(i3))).next(writableComparable2, mapWritable2)) {
                this.candidates.put(Integer.valueOf(i3), new KeyValuePair<>(writableComparable2, mapWritable2));
            } else {
                ((SequenceFile.Reader) hashMap.get(Integer.valueOf(i3))).close();
                hashMap.remove(Integer.valueOf(i3));
            }
        }
        this.candidates.clear();
        createWriter.close();
    }

    private void mergeFiles(FileStatus[] fileStatusArr, Path path, Class cls, Class cls2, Class cls3) throws IOException {
        SequenceFile.Writer createWriter = SequenceFile.createWriter(this.fs, this.conf, path, cls2, cls3, SequenceFile.CompressionType.NONE);
        for (FileStatus fileStatus : fileStatusArr) {
            SequenceFile.Reader reader = new SequenceFile.Reader(this.fs, fileStatus.getPath(), this.conf);
            Writable writable = (Writable) ReflectionUtils.newInstance(cls, this.conf);
            MapWritable mapWritable = new MapWritable();
            while (reader.next(writable, mapWritable)) {
                for (Map.Entry entry : mapWritable.entrySet()) {
                    createWriter.append((Writable) entry.getKey(), (Writable) entry.getValue());
                }
            }
            reader.close();
        }
        createWriter.close();
    }

    @Override // org.apache.hama.bsp.BSP, org.apache.hama.bsp.BSPInterface
    public void cleanup(BSPPeer<Writable, Writable, Writable, Writable, NullWritable> bSPPeer) throws IOException {
        if (this.pipesPartitioner != null) {
            this.pipesPartitioner.cleanup();
        }
    }

    public static int getMergeProcessorID(int i, int i2) {
        return i % i2;
    }

    public Partitioner getPartitioner() {
        Class cls = this.conf.getClass(Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class, Partitioner.class);
        LOG.debug("bsp.input.partitioner.class: " + cls.toString());
        Partitioner partitioner = null;
        if (PipesPartitioner.class.equals(cls)) {
            try {
                partitioner = (Partitioner) cls.getConstructor(Configuration.class).newInstance(this.conf);
                this.pipesPartitioner = (PipesPartitioner) partitioner;
            } catch (Exception e) {
                LOG.error(e);
            }
        } else {
            partitioner = (Partitioner) ReflectionUtils.newInstance(cls, this.conf);
        }
        return partitioner;
    }

    private static String getPartitionName(int i) {
        return "part-" + String.valueOf(100000 + i).substring(1, 6);
    }
}
