package org.apache.hama.bsp.message;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.util.LRUCache;

/* loaded from: input_file:org/apache/hama/bsp/message/AvroMessageManagerImpl.class */
public final class AvroMessageManagerImpl<M extends Writable> extends CompressableMessageManager<M> implements Sender<M> {
    private NettyServer server = null;
    private final HashMap<InetSocketAddress, Sender<M>> peers = new HashMap<>();
    private LRUCache<InetSocketAddress, NettyTransceiver> peersLRUCache;

    @Override // org.apache.hama.bsp.message.AbstractMessageManager, org.apache.hama.bsp.message.MessageManager
    public void init(TaskAttemptID taskAttemptID, BSPPeer<?, ?, ?, ?, M> bSPPeer, Configuration configuration, InetSocketAddress inetSocketAddress) {
        super.init(taskAttemptID, bSPPeer, configuration, inetSocketAddress);
        super.initCompression(configuration);
        this.server = new NettyServer(new SpecificResponder(Sender.class, this), inetSocketAddress);
        this.peersLRUCache = new LRUCache<InetSocketAddress, NettyTransceiver>(this.maxCachedConnections) { // from class: org.apache.hama.bsp.message.AvroMessageManagerImpl.1
            @Override // org.apache.hama.util.LRUCache, java.util.LinkedHashMap
            protected final boolean removeEldestEntry(Map.Entry<InetSocketAddress, NettyTransceiver> entry) {
                if (size() <= this.capacity) {
                    return false;
                }
                entry.getValue().close();
                AvroMessageManagerImpl.this.peers.remove(entry.getKey());
                return true;
            }
        };
    }

    @Override // org.apache.hama.bsp.message.AbstractMessageManager, org.apache.hama.bsp.message.MessageManager
    public void close() {
        super.close();
        this.server.close();
    }

    public void put(BSPMessageBundle<M> bSPMessageBundle) throws IOException {
        loopBackMessages(bSPMessageBundle);
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public void transfer(InetSocketAddress inetSocketAddress, BSPMessageBundle<M> bSPMessageBundle) throws IOException {
        AvroBSPMessageBundle<M> avroBSPMessageBundle = new AvroBSPMessageBundle<>();
        avroBSPMessageBundle.setData(serializeMessage(bSPMessageBundle));
        getSender(inetSocketAddress).transfer(avroBSPMessageBundle);
    }

    private final Sender<M> getSender(InetSocketAddress inetSocketAddress) throws IOException {
        if (this.peersLRUCache.get(inetSocketAddress) == null) {
            NettyTransceiver nettyTransceiver = new NettyTransceiver(inetSocketAddress);
            Sender<M> sender = (Sender) SpecificRequestor.getClient(Sender.class, nettyTransceiver);
            this.peersLRUCache.put(inetSocketAddress, nettyTransceiver);
            this.peers.put(inetSocketAddress, sender);
        }
        return this.peers.get(inetSocketAddress);
    }

    @Override // org.apache.hama.bsp.message.Sender
    public Void transfer(AvroBSPMessageBundle<M> avroBSPMessageBundle) throws AvroRemoteException {
        try {
            put(deserializeMessage(avroBSPMessageBundle.getData()));
            return null;
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    private final BSPMessageBundle<M> deserializeMessage(ByteBuffer byteBuffer) throws IOException {
        BSPMessageBundle<M> bSPMessageBundle = new BSPMessageBundle<>();
        byte[] array = byteBuffer.array();
        if (this.compressor == null) {
            this.peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_RECEIVED, array.length);
            bSPMessageBundle.readFields(new DataInputStream(new ByteArrayInputStream(array)));
        } else {
            this.peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_RECEIVED, array.length);
            bSPMessageBundle = this.compressor.decompressBundle(new BSPCompressedBundle(array));
        }
        return bSPMessageBundle;
    }

    private final ByteBuffer serializeMessage(BSPMessageBundle<M> bSPMessageBundle) throws IOException {
        if (this.compressor != null) {
            byte[] data = this.compressor.compressBundle(bSPMessageBundle).getData();
            this.peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_SENT, data.length);
            return ByteBuffer.wrap(data);
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        bSPMessageBundle.write(dataOutputStream);
        dataOutputStream.close();
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        this.peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED, byteArray.length);
        return ByteBuffer.wrap(byteArray);
    }
}
