package org.apache.hama.bsp;

import java.io.DataOutput;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.Counters;
import org.apache.hama.bsp.TaskStatus;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.MessageManagerFactory;
import org.apache.hama.bsp.sync.SyncClient;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.util.KeyValuePair;

/* loaded from: input_file:org/apache/hama/bsp/BSPPeerImpl.class */
public final class BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
    private final Configuration conf;
    private final FileSystem fs;
    private BSPJob bspJob;
    private TaskStatus currentTaskStatus;
    private TaskAttemptID taskId;
    private BSPPeerProtocol umbilical;
    private String[] allPeers;
    private SyncClient syncClient;
    private MessageManager messenger;
    private int partition;
    private String splitClass;
    private BytesWritable split;
    private OutputCollector<KEYOUT, VALUEOUT> collector;
    private RecordReader<KEYIN, VALUEIN> in;
    private RecordWriter<KEYOUT, VALUEOUT> outWriter;
    private InetSocketAddress peerAddress;
    private Counters counters;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hama/bsp/BSPPeerImpl$PeerCounter.class */
    public enum PeerCounter {
        SUPERSTEPS
    }

    protected BSPPeerImpl() {
        this.conf = null;
        this.fs = null;
    }

    protected BSPPeerImpl(Configuration configuration, FileSystem fileSystem) {
        this.conf = configuration;
        this.fs = fileSystem;
    }

    public BSPPeerImpl(BSPJob bSPJob, Configuration configuration, TaskAttemptID taskAttemptID, BSPPeerProtocol bSPPeerProtocol, int i, String str, BytesWritable bytesWritable, Counters counters) throws Exception {
        this.conf = configuration;
        this.taskId = taskAttemptID;
        this.umbilical = bSPPeerProtocol;
        this.bspJob = bSPJob;
        this.partition = i;
        this.splitClass = str;
        this.split = bytesWritable;
        this.counters = counters;
        this.fs = FileSystem.get(configuration);
        this.peerAddress = new InetSocketAddress(configuration.get(Constants.PEER_HOST, "0.0.0.0"), configuration.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT));
        initialize();
        this.syncClient.register(taskAttemptID.getJobID(), taskAttemptID, this.peerAddress.getHostName(), this.peerAddress.getPort());
        this.syncClient.enterBarrier(taskAttemptID.getJobID(), taskAttemptID, -1L);
        this.syncClient.leaveBarrier(taskAttemptID.getJobID(), taskAttemptID, -1L);
        setCurrentTaskStatus(new TaskStatus(taskAttemptID.getJobID(), taskAttemptID, 0.0f, TaskStatus.State.RUNNING, "running", this.peerAddress.getHostName(), TaskStatus.Phase.STARTING, counters));
        this.messenger = MessageManagerFactory.getMessageManager(configuration);
        this.messenger.init(configuration, this.peerAddress);
    }

    public final void initialize() throws Exception {
        this.syncClient = SyncServiceFactory.getSyncClient(this.conf);
        this.syncClient.init(this.conf, this.taskId.getJobID(), this.taskId);
        initInput();
        if (this.conf.get("bsp.output.dir") != null) {
            this.outWriter = this.bspJob.getOutputFormat().getRecordWriter(this.fs, this.bspJob, new Path(this.conf.get("bsp.output.dir"), Task.getOutputName(this.partition)).makeQualified(this.fs).toString());
            final RecordWriter<KEYOUT, VALUEOUT> recordWriter = this.outWriter;
            this.collector = new OutputCollector<KEYOUT, VALUEOUT>() { // from class: org.apache.hama.bsp.BSPPeerImpl.1
                @Override // org.apache.hama.bsp.OutputCollector
                public void collect(KEYOUT keyout, VALUEOUT valueout) throws IOException {
                    recordWriter.write(keyout, valueout);
                }
            };
        }
    }

    public final void initInput() throws IOException {
        if (this.conf.get("bsp.input.dir") != null) {
            try {
                InputSplit inputSplit = (InputSplit) ReflectionUtils.newInstance(getConfiguration().getClassByName(this.splitClass), getConfiguration());
                DataInputBuffer dataInputBuffer = new DataInputBuffer();
                dataInputBuffer.reset(this.split.getBytes(), 0, this.split.getLength());
                inputSplit.readFields(dataInputBuffer);
                if (this.in != null) {
                    this.in.close();
                }
                this.in = this.bspJob.getInputFormat().getRecordReader(inputSplit, this.bspJob);
            } catch (ClassNotFoundException e) {
                IOException iOException = new IOException("Split class " + this.splitClass + " not found");
                iOException.initCause(e);
                throw iOException;
            }
        }
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final BSPMessage getCurrentMessage() throws IOException {
        return this.messenger.getCurrentMessage();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void send(String str, BSPMessage bSPMessage) throws IOException {
        this.messenger.send(str, bSPMessage);
    }

    private final String checkpointedPath() {
        String str = this.conf.get("bsp.checkpoint.prefix_path", "/checkpoint/") + this.bspJob.getJobID().toString() + "/" + getSuperstepCount() + "/" + this.taskId.toString();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Messages are to be saved to " + str);
        }
        return str;
    }

    final void checkpoint(String str, BSPMessageBundle bSPMessageBundle) {
        DataOutput dataOutput = null;
        try {
            try {
                dataOutput = this.fs.create(new Path(str));
                bSPMessageBundle.write(dataOutput);
                if (null != dataOutput) {
                    try {
                        dataOutput.close();
                    } catch (IOException e) {
                        LOG.warn("Fail to close dfs output stream while checkpointing.", e);
                    }
                }
            } catch (Throwable th) {
                if (null != dataOutput) {
                    try {
                        dataOutput.close();
                    } catch (IOException e2) {
                        LOG.warn("Fail to close dfs output stream while checkpointing.", e2);
                        throw th;
                    }
                }
                throw th;
            }
        } catch (IOException e3) {
            LOG.warn("Fail checkpointing messages to " + str, e3);
            if (null != dataOutput) {
                try {
                    dataOutput.close();
                } catch (IOException e4) {
                    LOG.warn("Fail to close dfs output stream while checkpointing.", e4);
                }
            }
        }
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void sync() throws IOException, SyncException, InterruptedException {
        enterBarrier();
        Iterator<Map.Entry<InetSocketAddress, LinkedList<BSPMessage>>> messageIterator = this.messenger.getMessageIterator();
        while (messageIterator.hasNext()) {
            Map.Entry<InetSocketAddress, LinkedList<BSPMessage>> next = messageIterator.next();
            InetSocketAddress key = next.getKey();
            BSPMessageBundle combineMessages = combineMessages(next.getValue());
            if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
                checkpoint(checkpointedPath(), combineMessages);
            }
            messageIterator.remove();
            this.messenger.transfer(key, combineMessages);
        }
        leaveBarrier();
        incrCounter(PeerCounter.SUPERSTEPS, 1L);
        this.currentTaskStatus.setCounters(this.counters);
        this.umbilical.statusUpdate(this.taskId, this.currentTaskStatus);
        this.messenger.clearOutgoingQueues();
    }

    private final BSPMessageBundle combineMessages(Iterable<BSPMessage> iterable) {
        if (!this.conf.getClass("bsp.combiner.class", Combiner.class).equals(Combiner.class)) {
            return ((Combiner) ReflectionUtils.newInstance(this.conf.getClass("bsp.combiner.class", Combiner.class), this.conf)).combine(iterable);
        }
        BSPMessageBundle bSPMessageBundle = new BSPMessageBundle();
        Iterator<BSPMessage> it = iterable.iterator();
        while (it.hasNext()) {
            bSPMessageBundle.addMessage(it.next());
        }
        return bSPMessageBundle;
    }

    protected final void enterBarrier() throws SyncException {
        this.syncClient.enterBarrier(this.taskId.getJobID(), this.taskId, this.currentTaskStatus.getSuperstepCount());
    }

    protected final void leaveBarrier() throws SyncException {
        this.syncClient.leaveBarrier(this.taskId.getJobID(), this.taskId, this.currentTaskStatus.getSuperstepCount());
    }

    public final void close() throws SyncException, IOException, InterruptedException {
        if (this.in != null) {
            this.in.close();
        }
        if (this.outWriter != null) {
            this.outWriter.close();
        }
        clear();
        this.syncClient.close();
        this.messenger.close();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void clear() {
        this.messenger.clearOutgoingQueues();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final String getPeerName() {
        return this.peerAddress.getHostName() + ":" + this.peerAddress.getPort();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final String[] getAllPeerNames() {
        initPeerNames();
        return this.allPeers;
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final String getPeerName(int i) {
        initPeerNames();
        return this.allPeers[i];
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final int getNumPeers() {
        initPeerNames();
        return this.allPeers.length;
    }

    private final void initPeerNames() {
        if (this.allPeers == null) {
            this.allPeers = this.syncClient.getAllPeerNames(this.taskId);
        }
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final int getNumCurrentMessages() {
        return this.messenger.getNumCurrentMessages();
    }

    public final void setCurrentTaskStatus(TaskStatus taskStatus) {
        this.currentTaskStatus = taskStatus;
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final long getSuperstepCount() {
        return this.currentTaskStatus.getSuperstepCount();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final Configuration getConfiguration() {
        return this.conf;
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void write(KEYOUT keyout, VALUEOUT valueout) throws IOException {
        this.collector.collect(keyout, valueout);
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final boolean readNext(KEYIN keyin, VALUEIN valuein) throws IOException {
        return this.in.next(keyin, valuein);
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final KeyValuePair<KEYIN, VALUEIN> readNext() throws IOException {
        KEYIN createKey = this.in.createKey();
        VALUEIN createValue = this.in.createValue();
        if (this.in.next(createKey, createValue)) {
            return new KeyValuePair<>(createKey, createValue);
        }
        return null;
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void reopenInput() throws IOException {
        initInput();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final Counters.Counter getCounter(Enum<?> r4) {
        if (this.counters == null) {
            return null;
        }
        return this.counters.findCounter(r4);
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final Counters.Counter getCounter(String str, String str2) {
        Counters.Counter counter = null;
        if (this.counters != null) {
            counter = this.counters.findCounter(str, str2);
        }
        return counter;
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void incrCounter(Enum<?> r6, long j) {
        if (this.counters != null) {
            this.counters.incrCounter(r6, j);
        }
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void incrCounter(String str, String str2, long j) {
        if (this.counters != null) {
            this.counters.incrCounter(str, str2, j);
        }
    }
}
