package org.apache.hama.bsp.message;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.util.CompressionUtil;

/* loaded from: input_file:org/apache/hama/bsp/message/HadoopMessageManagerImpl.class */
public final class HadoopMessageManagerImpl<M extends Writable> extends CompressableMessageManager<M> implements HadoopMessageManager<M> {
    private static final Log LOG = LogFactory.getLog(HadoopMessageManagerImpl.class);
    private final HashMap<InetSocketAddress, HadoopMessageManager<M>> peers = new HashMap<>();
    private RPC.Server server = null;

    @Override // org.apache.hama.bsp.message.AbstractMessageManager, org.apache.hama.bsp.message.MessageManager
    public final void init(TaskAttemptID taskAttemptID, BSPPeer<?, ?, ?, ?, M> bSPPeer, Configuration configuration, InetSocketAddress inetSocketAddress) {
        super.init(taskAttemptID, bSPPeer, configuration, inetSocketAddress);
        super.initCompression(configuration);
        startRPCServer(configuration, inetSocketAddress);
    }

    private final void startRPCServer(Configuration configuration, InetSocketAddress inetSocketAddress) {
        try {
            this.server = RPC.getServer(this, inetSocketAddress.getHostName(), inetSocketAddress.getPort(), configuration);
            this.server.start();
            LOG.info(" BSPPeer address:" + inetSocketAddress.getHostName() + " port:" + inetSocketAddress.getPort());
        } catch (IOException e) {
            LOG.error("Fail to start RPC server!", e);
            throw new RuntimeException("RPC Server could not be launched!");
        }
    }

    @Override // org.apache.hama.bsp.message.AbstractMessageManager, org.apache.hama.bsp.message.MessageManager
    public final void close() {
        super.close();
        if (this.server != null) {
            this.server.stop();
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final void transfer(InetSocketAddress inetSocketAddress, BSPMessageBundle<M> bSPMessageBundle) throws IOException {
        HadoopMessageManager<M> bSPPeerConnection = getBSPPeerConnection(inetSocketAddress);
        if (bSPPeerConnection == null) {
            throw new IllegalArgumentException("Can not find " + inetSocketAddress.toString() + " to transfer messages to!");
        }
        if (this.compressor == null) {
            bSPPeerConnection.put((BSPMessageBundle) bSPMessageBundle);
            return;
        }
        BSPCompressedBundle compressBundle = this.compressor.compressBundle(bSPMessageBundle);
        if (CompressionUtil.getCompressionRatio(compressBundle, bSPMessageBundle) < 1.0d) {
            bSPPeerConnection.put(compressBundle);
        } else {
            bSPPeerConnection.put((BSPMessageBundle) bSPMessageBundle);
        }
    }

    protected final HadoopMessageManager<M> getBSPPeerConnection(InetSocketAddress inetSocketAddress) throws IOException {
        HadoopMessageManager<M> hadoopMessageManager = this.peers.get(inetSocketAddress);
        if (hadoopMessageManager == null) {
            hadoopMessageManager = (HadoopMessageManager) RPC.getProxy(HadoopMessageManager.class, 1L, inetSocketAddress, this.conf);
            this.peers.put(inetSocketAddress, hadoopMessageManager);
        }
        return hadoopMessageManager;
    }

    @Override // org.apache.hama.bsp.message.HadoopMessageManager
    public final void put(M m) {
        this.localQueueForNextIteration.add(m);
        this.peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
    }

    @Override // org.apache.hama.bsp.message.HadoopMessageManager
    public final void put(BSPMessageBundle<M> bSPMessageBundle) {
        Iterator<M> it = bSPMessageBundle.getMessages().iterator();
        while (it.hasNext()) {
            this.localQueueForNextIteration.add(it.next());
        }
    }

    @Override // org.apache.hama.bsp.message.HadoopMessageManager
    public final void put(BSPCompressedBundle bSPCompressedBundle) {
        Iterator<M> it = this.compressor.decompressBundle(bSPCompressedBundle).getMessages().iterator();
        while (it.hasNext()) {
            this.localQueueForNextIteration.add(it.next());
        }
    }

    public final long getProtocolVersion(String str, long j) throws IOException {
        return 1L;
    }
}
