package org.apache.hama.bsp;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
import org.apache.hama.pipes.PipesPartitioner;

/* loaded from: input_file:org/apache/hama/bsp/PartitioningRunner.class */
public class PartitioningRunner extends BSP<Writable, Writable, Writable, Writable, MapWritable> {
    public static final Log LOG = LogFactory.getLog(PartitioningRunner.class);
    private Configuration conf;
    private RecordConverter converter;
    private PipesPartitioner<?, ?> pipesPartitioner = null;
    public Map<Integer, SequenceFile.Writer> writerCache = new HashMap();

    /* loaded from: input_file:org/apache/hama/bsp/PartitioningRunner$DefaultRecordConverter.class */
    public static class DefaultRecordConverter implements RecordConverter {
        @Override // org.apache.hama.bsp.PartitioningRunner.RecordConverter
        public KeyValuePair<Writable, Writable> convertRecord(KeyValuePair<Writable, Writable> keyValuePair, Configuration configuration) {
            return keyValuePair;
        }

        @Override // org.apache.hama.bsp.PartitioningRunner.RecordConverter
        public int getPartitionId(KeyValuePair<Writable, Writable> keyValuePair, Partitioner partitioner, Configuration configuration, BSPPeer bSPPeer, int i) {
            return Math.abs(partitioner.getPartition(keyValuePair.getKey(), keyValuePair.getValue(), i));
        }

        @Override // org.apache.hama.bsp.PartitioningRunner.RecordConverter
        public void setup(Configuration configuration) {
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/PartitioningRunner$RecordConverter.class */
    public interface RecordConverter {
        void setup(Configuration configuration);

        KeyValuePair<Writable, Writable> convertRecord(KeyValuePair<Writable, Writable> keyValuePair, Configuration configuration) throws IOException;

        int getPartitionId(KeyValuePair<Writable, Writable> keyValuePair, Partitioner partitioner, Configuration configuration, BSPPeer bSPPeer, int i);
    }

    @Override // org.apache.hama.bsp.BSP, org.apache.hama.bsp.BSPInterface
    public final void setup(BSPPeer<Writable, Writable, Writable, Writable, MapWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
        this.conf = bSPPeer.getConfiguration();
        this.converter = (RecordConverter) ReflectionUtils.newInstance(this.conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, DefaultRecordConverter.class, RecordConverter.class), this.conf);
        this.converter.setup(this.conf);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.hama.bsp.BSP, org.apache.hama.bsp.BSPInterface
    public void bsp(BSPPeer<Writable, Writable, Writable, Writable, MapWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
        Partitioner partitioner = getPartitioner();
        Class<?> cls = null;
        Class<?> cls2 = null;
        while (true) {
            KeyValuePair<Writable, Writable> readNext = bSPPeer.readNext();
            if (readNext != null) {
                if (cls == null && cls2 == null) {
                    cls = ((Writable) readNext.getKey()).getClass();
                    cls2 = ((Writable) readNext.getValue()).getClass();
                }
                KeyValuePair<Writable, Writable> convertRecord = this.converter.convertRecord(readNext, this.conf);
                if (convertRecord == null) {
                    throw new IOException("The converted record can't be null.");
                }
                int partitionId = this.converter.getPartitionId(convertRecord, partitioner, this.conf, bSPPeer, bSPPeer.getNumPeers());
                MapWritable mapWritable = new MapWritable();
                mapWritable.put((Writable) readNext.getKey(), (Writable) readNext.getValue());
                bSPPeer.send(bSPPeer.getPeerName(partitionId), mapWritable);
            } else {
                bSPPeer.sync();
                while (true) {
                    MapWritable currentMessage = bSPPeer.getCurrentMessage();
                    if (currentMessage == null) {
                        return;
                    }
                    for (Map.Entry entry : currentMessage.entrySet()) {
                        bSPPeer.write(entry.getKey(), entry.getValue());
                    }
                }
            }
        }
    }

    @Override // org.apache.hama.bsp.BSP, org.apache.hama.bsp.BSPInterface
    public void cleanup(BSPPeer<Writable, Writable, Writable, Writable, MapWritable> bSPPeer) throws IOException {
        if (this.pipesPartitioner != null) {
            this.pipesPartitioner.cleanup();
        }
    }

    public Partitioner getPartitioner() {
        Class cls = this.conf.getClass(Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class, Partitioner.class);
        LOG.debug("bsp.input.partitioner.class: " + cls.toString());
        Partitioner partitioner = null;
        if (PipesPartitioner.class.equals(cls)) {
            try {
                partitioner = (Partitioner) cls.getConstructor(Configuration.class).newInstance(this.conf);
                this.pipesPartitioner = (PipesPartitioner) partitioner;
            } catch (Exception e) {
                LOG.error(e);
            }
        } else {
            partitioner = (Partitioner) ReflectionUtils.newInstance(cls, this.conf);
        }
        return partitioner;
    }
}
