package org.apache.hama.bsp.message;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.message.queue.MessageTransferQueue;
import org.apache.hama.bsp.message.queue.SingleLockQueue;
import org.apache.hama.bsp.message.queue.SynchronizedQueue;
import org.apache.hama.util.BSPNetUtils;

/* loaded from: input_file:org/apache/hama/bsp/message/AbstractMessageManager.class */
public abstract class AbstractMessageManager<M extends Writable> implements MessageManager<M>, Configurable {
    protected static final Log LOG = LogFactory.getLog(AbstractMessageManager.class);
    protected Configuration conf;
    protected MessageQueue<M> localQueue;
    protected SynchronizedQueue<M> localQueueForNextIteration;
    protected BSPPeer<?, ?, ?, ?, M> peer;
    protected InetSocketAddress peerAddress;
    protected TaskAttemptID attemptId;
    protected Queue<MessageEventListener<M>> messageListenerQueue;
    protected final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<>();
    protected final HashMap<InetSocketAddress, MessageQueue<M>> outgoingQueues = new HashMap<>();
    protected int maxCachedConnections = 100;

    @Override // org.apache.hama.bsp.message.MessageManager
    public void init(TaskAttemptID taskAttemptID, BSPPeer<?, ?, ?, ?, M> bSPPeer, Configuration configuration, InetSocketAddress inetSocketAddress) {
        this.messageListenerQueue = new LinkedList();
        this.attemptId = taskAttemptID;
        this.peer = bSPPeer;
        this.conf = configuration;
        this.peerAddress = inetSocketAddress;
        this.localQueue = getSenderQueue();
        this.localQueueForNextIteration = getSynchronizedReceiverQueue();
        this.maxCachedConnections = configuration.getInt(MessageManager.MAX_CACHED_CONNECTIONS_KEY, 100);
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public void close() {
        try {
            Iterator<MessageQueue<M>> it = this.outgoingQueues.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.localQueue.close();
            try {
                FileSystem.get(this.conf).delete(DiskQueue.getQueueDir(this.conf, this.attemptId, this.conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
            } catch (IOException e) {
                LOG.warn("Queue dir couldn't be deleted");
            }
        } finally {
            notifyClose();
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public void finishSendPhase() throws IOException {
        Iterator<MessageQueue<M>> it = this.outgoingQueues.values().iterator();
        while (it.hasNext()) {
            it.next().prepareRead();
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final M getCurrentMessage() throws IOException {
        return this.localQueue.poll();
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final int getNumCurrentMessages() {
        return this.localQueue.size();
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final void clearOutgoingQueues() {
        this.localQueue = this.localQueueForNextIteration.getMessageQueue();
        this.localQueue.prepareRead();
        this.localQueueForNextIteration = getSynchronizedReceiverQueue();
        notifyInit();
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public void send(String str, M m) throws IOException {
        InetSocketAddress address;
        if (this.peerSocketCache.containsKey(str)) {
            address = this.peerSocketCache.get(str);
        } else {
            address = BSPNetUtils.getAddress(str);
            this.peerSocketCache.put(str, address);
        }
        MessageQueue<M> messageQueue = this.outgoingQueues.get(address);
        if (messageQueue == null) {
            messageQueue = getSenderQueue();
        }
        messageQueue.add(m);
        this.peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
        this.outgoingQueues.put(address, messageQueue);
        notifySentMessage(str, m);
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final Iterator<Map.Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator() {
        return this.outgoingQueues.entrySet().iterator();
    }

    protected MessageQueue<M> getSenderQueue() {
        Class cls = this.conf.getClass(MessageManager.QUEUE_TYPE_CLASS, MemoryQueue.class);
        LOG.debug("Creating new " + cls);
        MessageQueue<M> senderQueue = ((MessageTransferQueue) ReflectionUtils.newInstance(cls, this.conf)).getSenderQueue();
        senderQueue.init(this.conf, this.attemptId);
        return senderQueue;
    }

    protected MessageQueue<M> getReceiverQueue() {
        Class cls = this.conf.getClass(MessageManager.QUEUE_TYPE_CLASS, MemoryQueue.class);
        LOG.debug("Creating new " + cls);
        MessageQueue<M> receiverQueue = ((MessageTransferQueue) ReflectionUtils.newInstance(cls, this.conf)).getReceiverQueue();
        receiverQueue.init(this.conf, this.attemptId);
        return receiverQueue;
    }

    protected SynchronizedQueue<M> getSynchronizedSenderQueue() {
        return SingleLockQueue.synchronize(getSenderQueue());
    }

    protected SynchronizedQueue<M> getSynchronizedReceiverQueue() {
        return SingleLockQueue.synchronize(getReceiverQueue());
    }

    public final Configuration getConf() {
        return this.conf;
    }

    public final void setConf(Configuration configuration) {
        this.conf = configuration;
    }

    private void notifySentMessage(String str, M m) {
        Iterator<MessageEventListener<M>> it = this.messageListenerQueue.iterator();
        while (it.hasNext()) {
            it.next().onMessageSent(str, m);
        }
    }

    private void notifyReceivedMessage(M m) throws IOException {
        Iterator<MessageEventListener<M>> it = this.messageListenerQueue.iterator();
        while (it.hasNext()) {
            it.next().onMessageReceived(m);
        }
    }

    private void notifyInit() {
        Iterator<MessageEventListener<M>> it = this.messageListenerQueue.iterator();
        while (it.hasNext()) {
            it.next().onInitialized();
        }
    }

    private void notifyClose() {
        Iterator<MessageEventListener<M>> it = this.messageListenerQueue.iterator();
        while (it.hasNext()) {
            it.next().onClose();
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public void registerListener(MessageEventListener<M> messageEventListener) throws IOException {
        if (messageEventListener != null) {
            this.messageListenerQueue.add(messageEventListener);
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public void loopBackMessages(BSPMessageBundle<? extends Writable> bSPMessageBundle) throws IOException {
        Iterator<? extends Writable> it = bSPMessageBundle.getMessages().iterator();
        while (it.hasNext()) {
            loopBackMessage(it.next());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.hama.bsp.message.MessageManager
    public void loopBackMessage(Writable writable) throws IOException {
        this.localQueueForNextIteration.add(writable);
        this.peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
        notifyReceivedMessage(writable);
    }
}
