package org.apache.hama.bsp;

import java.io.DataInput;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.Counters;
import org.apache.hama.bsp.TaskStatus;
import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
import org.apache.hama.bsp.ft.BSPFaultTolerantService;
import org.apache.hama.bsp.ft.FaultTolerantPeerService;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.MessageManagerFactory;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.sync.PeerSyncClient;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.util.KeyValuePair;

/* loaded from: input_file:org/apache/hama/bsp/BSPPeerImpl.class */
public final class BSPPeerImpl<K1, V1, K2, V2, M extends Writable> implements BSPPeer<K1, V1, K2, V2, M> {
    private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
    private final Configuration conf;
    private final FileSystem fs;
    private BSPJob bspJob;
    private TaskStatus currentTaskStatus;
    private TaskAttemptID taskId;
    private BSPPeerProtocol umbilical;
    private String[] allPeers;
    private PeerSyncClient syncClient;
    private MessageManager<M> messenger;
    private int partition;
    private String splitClass;
    private BytesWritable split;
    private OutputCollector<K2, V2> collector;
    private RecordReader<K1, V1> in;
    private RecordWriter<K2, V2> outWriter;
    private final KeyValuePair<K1, V1> cachedPair;
    private InetSocketAddress peerAddress;
    private Counters counters;
    private Combiner<M> combiner;
    private FaultTolerantPeerService<M> faultToleranceService;
    private long splitSize;

    /* loaded from: input_file:org/apache/hama/bsp/BSPPeerImpl$PeerCounter.class */
    public enum PeerCounter {
        COMPRESSED_MESSAGES,
        SUPERSTEP_SUM,
        TASK_INPUT_RECORDS,
        TASK_OUTPUT_RECORDS,
        IO_BYTES_READ,
        MESSAGE_BYTES_TRANSFERED,
        MESSAGE_BYTES_RECEIVED,
        TOTAL_MESSAGES_SENT,
        TOTAL_MESSAGES_RECEIVED,
        COMPRESSED_BYTES_SENT,
        COMPRESSED_BYTES_RECEIVED,
        TIME_IN_SYNC_MS
    }

    protected BSPPeerImpl() {
        this.cachedPair = new KeyValuePair<>();
        this.splitSize = 0L;
        this.conf = null;
        this.fs = null;
    }

    protected BSPPeerImpl(Configuration configuration, FileSystem fileSystem) {
        this.cachedPair = new KeyValuePair<>();
        this.splitSize = 0L;
        this.conf = configuration;
        this.fs = fileSystem;
    }

    public BSPPeerImpl(Configuration configuration, FileSystem fileSystem, Counters counters) {
        this(configuration, fileSystem);
        this.counters = counters;
    }

    public BSPPeerImpl(BSPJob bSPJob, Configuration configuration, TaskAttemptID taskAttemptID, BSPPeerProtocol bSPPeerProtocol, int i, String str, BytesWritable bytesWritable, Counters counters) throws Exception {
        this(bSPJob, configuration, taskAttemptID, bSPPeerProtocol, i, str, bytesWritable, counters, -1L, TaskStatus.State.RUNNING);
    }

    public BSPPeerImpl(BSPJob bSPJob, Configuration configuration, TaskAttemptID taskAttemptID, BSPPeerProtocol bSPPeerProtocol, int i, String str, BytesWritable bytesWritable, Counters counters, long j, TaskStatus.State state) throws Exception {
        this.cachedPair = new KeyValuePair<>();
        this.splitSize = 0L;
        this.conf = configuration;
        this.taskId = taskAttemptID;
        this.umbilical = bSPPeerProtocol;
        this.bspJob = bSPJob;
        this.partition = i;
        this.splitClass = str;
        this.split = bytesWritable;
        this.counters = counters;
        this.fs = FileSystem.get(configuration);
        this.peerAddress = new InetSocketAddress(configuration.get(Constants.PEER_HOST, "0.0.0.0"), configuration.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT));
        initializeIO();
        initializeSyncService(j, state);
        TaskStatus.Phase phase = TaskStatus.Phase.STARTING;
        String str2 = "running";
        if (state == TaskStatus.State.RECOVERING) {
            phase = TaskStatus.Phase.RECOVERING;
            str2 = "recovering";
        }
        setCurrentTaskStatus(new TaskStatus(taskAttemptID.getJobID(), taskAttemptID, 1.0f, state, str2, this.peerAddress.getHostName(), phase, counters));
        initilizeMessaging();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Initialized Messaging service.");
        }
        String str3 = configuration.get("bsp.combiner.class");
        if (str3 != null) {
            this.combiner = (Combiner) ReflectionUtils.newInstance(configuration.getClassByName(str3), configuration);
        }
        if (configuration.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Fault tolerance enabled.");
            }
            if (j > 0) {
                configuration.setInt("attempt.superstep", (int) j);
            }
            Class cls = configuration.getClass(Constants.FAULT_TOLERANCE_CLASS, AsyncRcvdMsgCheckpointImpl.class, BSPFaultTolerantService.class);
            if (cls != null) {
                if (j > 0) {
                    counters.incrCounter(PeerCounter.SUPERSTEP_SUM, j);
                }
                this.faultToleranceService = ((BSPFaultTolerantService) ReflectionUtils.newInstance(cls, (Configuration) null)).constructPeerFaultTolerance(bSPJob, this, this.syncClient, this.peerAddress, this.taskId, j, configuration, this.messenger);
                TaskStatus.State onPeerInitialized = this.faultToleranceService.onPeerInitialized(state);
                if (state == TaskStatus.State.RECOVERING) {
                    if (onPeerInitialized == TaskStatus.State.RUNNING) {
                        phase = TaskStatus.Phase.STARTING;
                        str2 = "running";
                        state = onPeerInitialized;
                    }
                    setCurrentTaskStatus(new TaskStatus(taskAttemptID.getJobID(), taskAttemptID, 1.0f, state, str2, this.peerAddress.getHostName(), phase, counters));
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("State after FT service initialization - " + onPeerInitialized.toString());
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Initialized fault tolerance service");
                }
            }
        }
        doFirstSync(j);
        if (LOG.isDebugEnabled()) {
            LOG.info(new StringBuffer("BSP Peer successfully initialized for ").append(this.taskId.toString()).append(" ").append(j).toString());
        }
    }

    public final void moveCacheFiles() throws IOException {
        StringBuilder sb = new StringBuilder();
        boolean z = true;
        if (DistributedCache.getCacheFiles(this.conf) != null) {
            for (URI uri : DistributedCache.getCacheFiles(this.conf)) {
                if (uri != null) {
                    if (!z) {
                        sb.append(",");
                    }
                    if (null != uri.getFragment() && DistributedCache.getSymlink(this.conf)) {
                        FileUtil.symLink(uri.getPath(), uri.getFragment());
                        sb.append(uri.getFragment()).append(",");
                    }
                    FileSystem fileSystem = FileSystem.get(this.conf);
                    Path path = new Path(uri.getPath());
                    if (fileSystem.exists(path)) {
                        Path path2 = new Path(FileSystem.getLocal(this.conf).getWorkingDirectory(), path.getName());
                        fileSystem.copyToLocalFile(path, path2);
                        sb.append(path2.toUri().getPath());
                    }
                    z = false;
                }
            }
        }
        if (sb.length() > 0) {
            DistributedCache.addLocalFiles(this.conf, sb.toString());
        }
    }

    public final void initInput() throws IOException {
        InputSplit inputSplit = null;
        try {
            if (this.splitClass != null) {
                inputSplit = (InputSplit) ReflectionUtils.newInstance(getConfiguration().getClassByName(this.splitClass), getConfiguration());
            }
            if (inputSplit != null) {
                DataInput dataInputBuffer = new DataInputBuffer();
                dataInputBuffer.reset(this.split.getBytes(), 0, this.split.getLength());
                inputSplit.readFields(dataInputBuffer);
                if (this.in != null) {
                    this.in.close();
                }
                this.in = new TrackedRecordReader(this.bspJob.getInputFormat().getRecordReader(inputSplit, this.bspJob), getCounter(PeerCounter.TASK_INPUT_RECORDS), getCounter(PeerCounter.IO_BYTES_READ));
                this.splitSize = inputSplit.getLength();
            }
        } catch (ClassNotFoundException e) {
            IOException iOException = new IOException("Split class " + this.splitClass + " not found");
            iOException.initCause(e);
            throw iOException;
        }
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public long getSplitSize() {
        return this.splitSize;
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public long getPos() throws IOException {
        return this.in.getPos();
    }

    public final void initilizeMessaging() throws ClassNotFoundException {
        this.messenger = MessageManagerFactory.getMessageManager(this.conf);
        this.messenger.init(this.taskId, this, this.conf, this.peerAddress);
    }

    public final void initializeSyncService(long j, TaskStatus.State state) throws Exception {
        this.syncClient = SyncServiceFactory.getPeerSyncClient(this.conf);
        this.syncClient.init(this.conf, this.taskId.getJobID(), this.taskId);
        this.syncClient.register(this.taskId.getJobID(), this.taskId, this.peerAddress.getHostName(), this.peerAddress.getPort());
    }

    private void doFirstSync(long j) throws SyncException {
        if (j > 0) {
            j--;
        }
        this.syncClient.enterBarrier(this.taskId.getJobID(), this.taskId, j);
        this.syncClient.leaveBarrier(this.taskId.getJobID(), this.taskId, j);
    }

    public final void initializeIO() throws Exception {
        initInput();
        String str = null;
        if (this.conf.get("bsp.output.dir") != null) {
            str = new Path(this.conf.get("bsp.output.dir", "tmp-" + System.currentTimeMillis()), Task.getOutputName(this.partition)).makeQualified(this.fs).toString();
        }
        this.outWriter = this.bspJob.getOutputFormat().getRecordWriter(this.fs, this.bspJob, str);
        final RecordWriter<K2, V2> recordWriter = this.outWriter;
        this.collector = new OutputCollector<K2, V2>() { // from class: org.apache.hama.bsp.BSPPeerImpl.1
            @Override // org.apache.hama.bsp.OutputCollector
            public void collect(K2 k2, V2 v2) throws IOException {
                recordWriter.write(k2, v2);
            }
        };
        try {
            moveCacheFiles();
        } catch (Exception e) {
            LOG.error(e);
        }
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final M getCurrentMessage() throws IOException {
        return this.messenger.getCurrentMessage();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void send(String str, M m) throws IOException {
        this.messenger.send(str, m);
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void sync() throws IOException, SyncException, InterruptedException {
        this.messenger.finishSendPhase();
        Iterator<Map.Entry<InetSocketAddress, MessageQueue<M>>> messageIterator = this.messenger.getMessageIterator();
        while (messageIterator.hasNext()) {
            Map.Entry<InetSocketAddress, MessageQueue<M>> next = messageIterator.next();
            InetSocketAddress key = next.getKey();
            BSPMessageBundle<M> combineMessages = combineMessages(next.getValue());
            messageIterator.remove();
            try {
                this.messenger.transfer(key, combineMessages);
            } catch (Exception e) {
                LOG.error("Error while sending messages", e);
            }
        }
        if (this.faultToleranceService != null) {
            try {
                this.faultToleranceService.beforeBarrier();
            } catch (Exception e2) {
                throw new IOException(e2);
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        enterBarrier();
        if (this.faultToleranceService != null) {
            try {
                this.faultToleranceService.duringBarrier();
            } catch (Exception e3) {
                throw new IOException(e3);
            }
        }
        this.messenger.clearOutgoingQueues();
        leaveBarrier();
        incrementCounter(PeerCounter.TIME_IN_SYNC_MS, System.currentTimeMillis() - currentTimeMillis);
        incrementCounter(PeerCounter.SUPERSTEP_SUM, 1L);
        this.currentTaskStatus.setCounters(this.counters);
        if (this.faultToleranceService != null) {
            try {
                this.faultToleranceService.afterBarrier();
            } catch (Exception e4) {
                throw new IOException(e4);
            }
        }
        this.umbilical.statusUpdate(this.taskId, this.currentTaskStatus);
    }

    private final BSPMessageBundle<M> combineMessages(Iterable<M> iterable) {
        BSPMessageBundle<M> bSPMessageBundle = new BSPMessageBundle<>();
        if (this.combiner != null) {
            bSPMessageBundle.addMessage(this.combiner.combine(iterable));
        } else {
            Iterator<M> it = iterable.iterator();
            while (it.hasNext()) {
                bSPMessageBundle.addMessage(it.next());
            }
        }
        return bSPMessageBundle;
    }

    protected final void enterBarrier() throws SyncException {
        this.syncClient.enterBarrier(this.taskId.getJobID(), this.taskId, this.currentTaskStatus.getSuperstepCount());
    }

    protected final void leaveBarrier() throws SyncException {
        this.syncClient.leaveBarrier(this.taskId.getJobID(), this.taskId, this.currentTaskStatus.getSuperstepCount());
    }

    public void deleteLocalFiles() throws IOException {
        if (DistributedCache.getLocalCacheFiles(this.conf) != null) {
            for (Path path : DistributedCache.getLocalCacheFiles(this.conf)) {
                if (path != null) {
                    LocalFileSystem local = FileSystem.getLocal(this.conf);
                    if (local.exists(path)) {
                        local.delete(path, true);
                    }
                }
            }
        }
        DistributedCache.setLocalFiles(this.conf, "");
    }

    public final void close() {
        if (this.in != null) {
            try {
                this.in.close();
            } catch (Exception e) {
                LOG.error(e);
            }
        }
        if (this.outWriter != null) {
            try {
                this.outWriter.close();
            } catch (Exception e2) {
                LOG.error(e2);
            }
        }
        clear();
        try {
            this.syncClient.close();
        } catch (Exception e3) {
            LOG.error(e3);
        }
        try {
            this.messenger.close();
        } catch (Exception e4) {
            LOG.error(e4);
        }
        try {
            deleteLocalFiles();
        } catch (Exception e5) {
            LOG.error(e5);
        }
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void clear() {
        this.messenger.clearOutgoingQueues();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final String getPeerName() {
        return this.peerAddress.getHostName() + ":" + this.peerAddress.getPort();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final String[] getAllPeerNames() {
        initPeerNames();
        return this.allPeers;
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final String getPeerName(int i) {
        initPeerNames();
        return this.allPeers[i];
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public int getPeerIndex() {
        return this.taskId.getTaskID().getId();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final int getNumPeers() {
        initPeerNames();
        return this.allPeers.length;
    }

    private final void initPeerNames() {
        if (this.allPeers == null) {
            this.allPeers = this.syncClient.getAllPeerNames(this.taskId);
        }
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final int getNumCurrentMessages() {
        return this.messenger.getNumCurrentMessages();
    }

    public final void setCurrentTaskStatus(TaskStatus taskStatus) {
        this.currentTaskStatus = taskStatus;
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final long getSuperstepCount() {
        return this.currentTaskStatus.getSuperstepCount();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final Configuration getConfiguration() {
        return this.conf;
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void write(K2 k2, V2 v2) throws IOException {
        incrementCounter(PeerCounter.TASK_OUTPUT_RECORDS, 1L);
        this.collector.collect(k2, v2);
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final boolean readNext(K1 k1, V1 v1) throws IOException {
        return this.in.next(k1, v1);
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final KeyValuePair<K1, V1> readNext() throws IOException {
        K1 createKey = this.in.createKey();
        V1 createValue = this.in.createValue();
        if (!this.in.next(createKey, createValue)) {
            return null;
        }
        this.cachedPair.clear();
        this.cachedPair.setKey(createKey);
        this.cachedPair.setValue(createValue);
        return this.cachedPair;
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void reopenInput() throws IOException {
        initInput();
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final Counters.Counter getCounter(Enum<?> r4) {
        if (this.counters == null) {
            return null;
        }
        return this.counters.findCounter(r4);
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final Counters.Counter getCounter(String str, String str2) {
        Counters.Counter counter = null;
        if (this.counters != null) {
            counter = this.counters.findCounter(str, str2);
        }
        return counter;
    }

    public Counters getCounters() {
        return this.counters;
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void incrementCounter(Enum<?> r6, long j) {
        if (this.counters != null) {
            this.counters.incrCounter(r6, j);
        }
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public final void incrementCounter(String str, String str2, long j) {
        if (this.counters != null) {
            this.counters.incrCounter(str, str2, j);
        }
    }

    @Override // org.apache.hama.bsp.BSPPeer
    public TaskAttemptID getTaskId() {
        return this.taskId;
    }
}
