package org.apache.hama.bsp;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.util.KeyValuePair;
import org.apache.hama.util.ZKUtil;

/* loaded from: input_file:org/apache/hama/bsp/PartitioningRunner.class */
public class PartitioningRunner extends BSP<Writable, Writable, Writable, Writable, NullWritable> {
    public static final Log LOG = LogFactory.getLog(PartitioningRunner.class);
    private Configuration conf;
    private int desiredNum;
    private Path partitionDir;
    private RecordConverter converter;
    private FileSystem fs = null;
    private Map<Integer, Map<Writable, Writable>> values = new HashMap();

    /* loaded from: input_file:org/apache/hama/bsp/PartitioningRunner$DefaultRecordConverter.class */
    public static class DefaultRecordConverter implements RecordConverter {
        @Override // org.apache.hama.bsp.PartitioningRunner.RecordConverter
        public KeyValuePair<Writable, Writable> convertRecord(KeyValuePair<Writable, Writable> keyValuePair, Configuration configuration) {
            return keyValuePair;
        }

        @Override // org.apache.hama.bsp.PartitioningRunner.RecordConverter
        public int getPartitionId(KeyValuePair<Writable, Writable> keyValuePair, Partitioner partitioner, Configuration configuration, BSPPeer bSPPeer, int i) {
            return Math.abs(partitioner.getPartition(keyValuePair.getKey(), keyValuePair.getValue(), i));
        }

        @Override // org.apache.hama.bsp.PartitioningRunner.RecordConverter
        public void setup(Configuration configuration) {
        }

        @Override // org.apache.hama.bsp.PartitioningRunner.RecordConverter
        public Map<Writable, Writable> newMap() {
            return new HashMap();
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/PartitioningRunner$RecordConverter.class */
    public interface RecordConverter {
        void setup(Configuration configuration);

        KeyValuePair<Writable, Writable> convertRecord(KeyValuePair<Writable, Writable> keyValuePair, Configuration configuration);

        int getPartitionId(KeyValuePair<Writable, Writable> keyValuePair, Partitioner partitioner, Configuration configuration, BSPPeer bSPPeer, int i);

        Map<Writable, Writable> newMap();
    }

    @Override // org.apache.hama.bsp.BSP, org.apache.hama.bsp.BSPInterface
    public final void setup(BSPPeer<Writable, Writable, Writable, Writable, NullWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
        this.conf = bSPPeer.getConfiguration();
        this.desiredNum = this.conf.getInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, 1);
        this.fs = FileSystem.get(this.conf);
        Path path = new Path(this.conf.get(Constants.JOB_INPUT_DIR));
        if (this.fs.isFile(path)) {
            path = path.getParent();
        }
        this.converter = (RecordConverter) ReflectionUtils.newInstance(this.conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, DefaultRecordConverter.class, RecordConverter.class), this.conf);
        this.converter.setup(this.conf);
        if (this.conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) {
            this.partitionDir = new Path(path + "/partitions");
        } else {
            this.partitionDir = new Path(this.conf.get(Constants.RUNTIME_PARTITIONING_DIR));
        }
    }

    @Override // org.apache.hama.bsp.BSP, org.apache.hama.bsp.BSPInterface
    public void bsp(BSPPeer<Writable, Writable, Writable, Writable, NullWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
        Partitioner partitioner = getPartitioner();
        Class<?> cls = null;
        Class<?> cls2 = null;
        Class<?> cls3 = null;
        Class<?> cls4 = null;
        while (true) {
            KeyValuePair<Writable, Writable> readNext = bSPPeer.readNext();
            if (readNext == null) {
                break;
            }
            if (cls == null && cls2 == null) {
                cls = readNext.getKey().getClass();
                cls2 = readNext.getValue().getClass();
            }
            KeyValuePair<Writable, Writable> convertRecord = this.converter.convertRecord(readNext, this.conf);
            if (convertRecord != null) {
                if (cls3 == null && cls4 == null) {
                    cls3 = convertRecord.getKey().getClass();
                    cls4 = convertRecord.getValue().getClass();
                }
                int partitionId = this.converter.getPartitionId(convertRecord, partitioner, this.conf, bSPPeer, this.desiredNum);
                Map<Writable, Writable> map = this.values.get(Integer.valueOf(partitionId));
                if (map == null) {
                    map = this.converter.newMap();
                    this.values.put(Integer.valueOf(partitionId), map);
                }
                map.put(convertRecord.getKey(), convertRecord.getValue());
            }
        }
        for (Map.Entry<Integer, Map<Writable, Writable>> entry : this.values.entrySet()) {
            SequenceFile.Writer createWriter = SequenceFile.createWriter(this.fs, this.conf, new Path(this.partitionDir + "/part-" + entry.getKey() + "/file-" + bSPPeer.getPeerIndex()), cls3, cls4, SequenceFile.CompressionType.NONE);
            for (Map.Entry<Writable, Writable> entry2 : entry.getValue().entrySet()) {
                createWriter.append(entry2.getKey(), entry2.getValue());
            }
            createWriter.close();
        }
        bSPPeer.sync();
        FileStatus[] listStatus = this.fs.listStatus(this.partitionDir);
        bSPPeer.sync();
        for (FileStatus fileStatus : listStatus) {
            int parseInt = Integer.parseInt(fileStatus.getPath().getName().split("[-]")[1]);
            int numPeers = this.desiredNum / bSPPeer.getNumPeers();
            int i = parseInt;
            if (numPeers > 1) {
                i = parseInt / numPeers;
            }
            if (i == bSPPeer.getNumPeers()) {
                i--;
            }
            if (i == bSPPeer.getPeerIndex()) {
                Path path = new Path(this.partitionDir + ZKUtil.ZK_SEPARATOR + getPartitionName(parseInt));
                FileStatus[] listStatus2 = this.fs.listStatus(fileStatus.getPath());
                SequenceFile.Writer createWriter2 = SequenceFile.createWriter(this.fs, this.conf, path, cls3, cls4, SequenceFile.CompressionType.NONE);
                for (int i2 = 0; i2 < listStatus2.length; i2++) {
                    LOG.debug("merge '" + listStatus2[i2].getPath() + "' into " + this.partitionDir + ZKUtil.ZK_SEPARATOR + getPartitionName(parseInt));
                    SequenceFile.Reader reader = new SequenceFile.Reader(this.fs, listStatus2[i2].getPath(), this.conf);
                    Writable writable = (Writable) ReflectionUtils.newInstance(cls3, this.conf);
                    Writable writable2 = (Writable) ReflectionUtils.newInstance(cls4, this.conf);
                    while (reader.next(writable, writable2)) {
                        createWriter2.append(writable, writable2);
                    }
                    reader.close();
                }
                createWriter2.close();
                this.fs.delete(fileStatus.getPath(), true);
            }
        }
    }

    public Partitioner getPartitioner() {
        return (Partitioner) ReflectionUtils.newInstance(this.conf.getClass(Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class, Partitioner.class), this.conf);
    }

    private static String getPartitionName(int i) {
        return "part-" + String.valueOf(100000 + i).substring(1, 6);
    }
}
