package org.apache.hama.bsp.message;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hama.bsp.BSPMessage;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.util.BSPNetUtils;

/* loaded from: input_file:org/apache/hama/bsp/message/HadoopMessageManagerImpl.class */
public final class HadoopMessageManagerImpl implements MessageManager, HadoopMessageManager {
    private static final Log LOG = LogFactory.getLog(HadoopMessageManagerImpl.class);
    private Configuration conf;
    private RPC.Server server = null;
    private final HashMap<InetSocketAddress, HadoopMessageManager> peers = new HashMap<>();
    private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<>();
    private final HashMap<InetSocketAddress, LinkedList<BSPMessage>> outgoingQueues = new HashMap<>();
    private Deque<BSPMessage> localQueue = new LinkedList();
    private final ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<>();

    @Override // org.apache.hama.bsp.message.MessageManager
    public final void init(Configuration configuration, InetSocketAddress inetSocketAddress) {
        this.conf = configuration;
        startRPCServer(configuration, inetSocketAddress);
    }

    private final void startRPCServer(Configuration configuration, InetSocketAddress inetSocketAddress) {
        try {
            this.server = RPC.getServer(this, inetSocketAddress.getHostName(), inetSocketAddress.getPort(), configuration);
            this.server.start();
            LOG.info(" BSPPeer address:" + inetSocketAddress.getHostName() + " port:" + inetSocketAddress.getPort());
        } catch (IOException e) {
            LOG.error("Fail to start RPC server!", e);
            throw new RuntimeException("RPC Server could not be launched!");
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final void close() {
        if (this.server != null) {
            this.server.stop();
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final BSPMessage getCurrentMessage() throws IOException {
        return this.localQueue.poll();
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final void send(String str, BSPMessage bSPMessage) throws IOException {
        InetSocketAddress address;
        LOG.debug("Send message (" + bSPMessage.toString() + ") to " + str);
        if (this.peerSocketCache.containsKey(str)) {
            address = this.peerSocketCache.get(str);
        } else {
            address = BSPNetUtils.getAddress(str);
            this.peerSocketCache.put(str, address);
        }
        LinkedList<BSPMessage> linkedList = this.outgoingQueues.get(address);
        if (linkedList == null) {
            linkedList = new LinkedList<>();
        }
        linkedList.add(bSPMessage);
        this.outgoingQueues.put(address, linkedList);
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final Iterator<Map.Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator() {
        return this.outgoingQueues.entrySet().iterator();
    }

    protected final HadoopMessageManager getBSPPeerConnection(InetSocketAddress inetSocketAddress) throws IOException {
        HadoopMessageManager hadoopMessageManager = this.peers.get(inetSocketAddress);
        if (hadoopMessageManager == null) {
            hadoopMessageManager = (HadoopMessageManager) RPC.getProxy(HadoopMessageManager.class, 1L, inetSocketAddress, this.conf);
            this.peers.put(inetSocketAddress, hadoopMessageManager);
        }
        return hadoopMessageManager;
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final void transfer(InetSocketAddress inetSocketAddress, BSPMessageBundle bSPMessageBundle) throws IOException {
        HadoopMessageManager bSPPeerConnection = getBSPPeerConnection(inetSocketAddress);
        if (bSPPeerConnection == null) {
            throw new IllegalArgumentException("Can not find " + inetSocketAddress.toString() + " to transfer messages to!");
        }
        bSPPeerConnection.put(bSPMessageBundle);
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final void clearOutgoingQueues() {
        this.outgoingQueues.clear();
        this.localQueue.addAll(this.localQueueForNextIteration);
        this.localQueueForNextIteration.clear();
    }

    @Override // org.apache.hama.bsp.message.HadoopMessageManager
    public final void put(BSPMessage bSPMessage) {
        this.localQueueForNextIteration.add(bSPMessage);
    }

    @Override // org.apache.hama.bsp.message.HadoopMessageManager
    public final void put(BSPMessageBundle bSPMessageBundle) {
        Iterator<BSPMessage> it = bSPMessageBundle.getMessages().iterator();
        while (it.hasNext()) {
            this.localQueueForNextIteration.add(it.next());
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final int getNumCurrentMessages() {
        return this.localQueue.size();
    }

    public final long getProtocolVersion(String str, long j) throws IOException {
        return 1L;
    }
}
