package org.apache.hama.bsp.message;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.ipc.RPC;
import org.apache.hama.util.LRUCache;

/* loaded from: input_file:org/apache/hama/bsp/message/HamaMessageManagerImpl.class */
public final class HamaMessageManagerImpl<M extends Writable> extends AbstractMessageManager<M> implements HamaMessageManager<M> {
    private static final int MAX_RETRY = 5;
    private RPC.Server server;
    private LRUCache<InetSocketAddress, HamaMessageManager<M>> peersLRUCache = null;
    private static final Log LOG = LogFactory.getLog(HamaMessageManagerImpl.class);
    private static int retry = 0;

    @Override // org.apache.hama.bsp.message.AbstractMessageManager, org.apache.hama.bsp.message.MessageManager
    public final void init(TaskAttemptID taskAttemptID, BSPPeer<?, ?, ?, ?, M> bSPPeer, HamaConfiguration hamaConfiguration, InetSocketAddress inetSocketAddress) {
        super.init(taskAttemptID, bSPPeer, hamaConfiguration, inetSocketAddress);
        retry = 0;
        startRPCServer(hamaConfiguration, inetSocketAddress);
        this.peersLRUCache = (LRUCache<InetSocketAddress, HamaMessageManager<M>>) new LRUCache<InetSocketAddress, HamaMessageManager<M>>(this.maxCachedConnections) { // from class: org.apache.hama.bsp.message.HamaMessageManagerImpl.1
            @Override // org.apache.hama.util.LRUCache, java.util.LinkedHashMap
            protected final boolean removeEldestEntry(Map.Entry<InetSocketAddress, HamaMessageManager<M>> entry) {
                if (size() <= this.capacity) {
                    return false;
                }
                RPC.stopProxy(entry.getValue());
                return true;
            }
        };
    }

    private final void startRPCServer(Configuration configuration, InetSocketAddress inetSocketAddress) {
        try {
            startServer(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
        } catch (IOException e) {
            LOG.error("Fail to start RPC server!", e);
            throw new RuntimeException("RPC Server could not be launched!");
        }
    }

    private void startServer(String str, int i) throws IOException {
        try {
            this.server = RPC.getServer(this, str, i, this.conf.getInt("hama.default.messenger.handler.threads.num", 5), false, this.conf);
            this.server.start();
            LOG.info("BSPPeer address:" + this.server.getListenerAddress().getHostName() + " port:" + this.server.getListenerAddress().getPort());
        } catch (BindException e) {
            int i2 = i + 1;
            LOG.warn("Address already in use. Retrying " + str + ":" + i2);
            int i3 = retry;
            retry = i3 + 1;
            if (i3 >= 5) {
                throw new RuntimeException("RPC Server could not be launched!");
            }
            startServer(str, i2);
        }
    }

    @Override // org.apache.hama.bsp.message.AbstractMessageManager, org.apache.hama.bsp.message.MessageManager
    public final void close() {
        super.close();
        if (this.server != null) {
            this.server.stop();
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final void transfer(InetSocketAddress inetSocketAddress, BSPMessageBundle<M> bSPMessageBundle) throws IOException {
        HamaMessageManager<M> bSPPeerConnection = getBSPPeerConnection(inetSocketAddress);
        if (bSPPeerConnection == null) {
            throw new IllegalArgumentException("Can not find " + inetSocketAddress.toString() + " to transfer messages to!");
        }
        if (!this.conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
            bSPPeerConnection.put((BSPMessageBundle) bSPMessageBundle);
            return;
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        bSPMessageBundle.write(new DataOutputStream(byteArrayOutputStream));
        byte[] compress = this.compressor.compress(byteArrayOutputStream.toByteArray());
        this.peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_COMPRESSED_BYTES_TRANSFERED, compress.length);
        this.peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_DECOMPRESSED_BYTES, byteArrayOutputStream.size());
        bSPPeerConnection.put(compress);
    }

    protected final HamaMessageManager<M> getBSPPeerConnection(InetSocketAddress inetSocketAddress) throws IOException {
        HamaMessageManager<M> hamaMessageManager;
        if (this.peersLRUCache.containsKey(inetSocketAddress)) {
            hamaMessageManager = this.peersLRUCache.get(inetSocketAddress);
        } else {
            hamaMessageManager = (HamaMessageManager) RPC.getProxy(HamaMessageManager.class, 1L, inetSocketAddress, this.conf);
            this.peersLRUCache.put(inetSocketAddress, hamaMessageManager);
        }
        return hamaMessageManager;
    }

    @Override // org.apache.hama.bsp.message.HamaMessageManager
    public final void put(M m) throws IOException {
        loopBackMessage(m);
    }

    @Override // org.apache.hama.bsp.message.HamaMessageManager
    public final void put(BSPMessageBundle<M> bSPMessageBundle) throws IOException {
        loopBackBundle(bSPMessageBundle);
    }

    @Override // org.apache.hama.bsp.message.HamaMessageManager
    public final void put(byte[] bArr) throws IOException {
        byte[] decompress = this.compressor.decompress(bArr);
        BSPMessageBundle<M> bSPMessageBundle = new BSPMessageBundle<>();
        bSPMessageBundle.readFields(new DataInputStream(new ByteArrayInputStream(decompress)));
        loopBackBundle(bSPMessageBundle);
    }

    @Override // org.apache.hama.ipc.VersionedProtocol
    public final long getProtocolVersion(String str, long j) throws IOException {
        return 1L;
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public InetSocketAddress getListenerAddress() {
        if (this.server != null) {
            return this.server.getListenerAddress();
        }
        return null;
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public void transfer(InetSocketAddress inetSocketAddress, M m) throws IOException {
        HamaMessageManager<M> bSPPeerConnection = getBSPPeerConnection(inetSocketAddress);
        if (bSPPeerConnection == null) {
            throw new IllegalArgumentException("Can not find " + inetSocketAddress.toString() + " to transfer messages to!");
        }
        bSPPeerConnection.put((HamaMessageManager<M>) m);
    }
}
