package org.apache.hama.bsp.message;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.message.queue.SingleLockQueue;
import org.apache.hama.bsp.message.queue.SynchronizedQueue;
import org.apache.hama.util.ReflectionUtils;

/* loaded from: input_file:org/apache/hama/bsp/message/AbstractMessageManager.class */
public abstract class AbstractMessageManager<M extends Writable> implements MessageManager<M>, Configurable {
    protected static final Log LOG = LogFactory.getLog(AbstractMessageManager.class);
    protected Configuration conf;
    protected OutgoingMessageManager<M> outgoingMessageManager;
    protected MessageQueue<M> localQueue;
    protected SynchronizedQueue<M> localQueueForNextIteration;
    protected BSPPeer<?, ?, ?, ?, M> peer;
    protected TaskAttemptID attemptId;
    protected int maxCachedConnections = 100;
    protected Queue<MessageEventListener<M>> messageListenerQueue;
    protected BSPMessageCompressor<M> compressor;

    @Override // org.apache.hama.bsp.message.MessageManager
    public void init(TaskAttemptID taskAttemptID, BSPPeer<?, ?, ?, ?, M> bSPPeer, HamaConfiguration hamaConfiguration, InetSocketAddress inetSocketAddress) {
        this.messageListenerQueue = new LinkedList();
        this.attemptId = taskAttemptID;
        this.peer = bSPPeer;
        this.conf = hamaConfiguration;
        this.localQueue = getReceiverQueue();
        this.localQueueForNextIteration = getSynchronizedReceiverQueue();
        this.maxCachedConnections = hamaConfiguration.getInt(MessageManager.MAX_CACHED_CONNECTIONS_KEY, 100);
        this.compressor = new BSPMessageCompressorFactory().getCompressor(hamaConfiguration);
        this.outgoingMessageManager = getOutgoingMessageManager();
        this.outgoingMessageManager.init(hamaConfiguration);
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public void close() {
        try {
            this.outgoingMessageManager.clear();
            this.localQueue.close();
            notifyClose();
        } catch (Throwable th) {
            notifyClose();
            throw th;
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final M getCurrentMessage() throws IOException {
        return this.localQueue.mo70poll();
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final int getNumCurrentMessages() {
        return this.localQueue.size();
    }

    public void clearIncomingMessages() {
        this.localQueue.clear();
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final void clearOutgoingMessages() {
        this.outgoingMessageManager.clear();
        if (!this.conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false) || this.localQueue.size() <= 0) {
            if (this.localQueue != null) {
                this.localQueue.close();
            }
            this.localQueue = this.localQueueForNextIteration.getMessageQueue();
        } else if (this.localQueue.size() > this.localQueueForNextIteration.size()) {
            this.localQueue.addAll(this.localQueueForNextIteration);
        } else {
            this.localQueueForNextIteration.addAll(this.localQueue);
            this.localQueue = this.localQueueForNextIteration.getMessageQueue();
        }
        this.localQueue.prepareRead();
        this.localQueueForNextIteration = getSynchronizedReceiverQueue();
        notifyInit();
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public void send(String str, M m) throws IOException {
        this.outgoingMessageManager.addMessage(str, m);
        this.peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
        notifySentMessage(str, m);
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public final Iterator<Map.Entry<InetSocketAddress, BSPMessageBundle<M>>> getOutgoingBundles() {
        return this.outgoingMessageManager.getBundleIterator();
    }

    protected OutgoingMessageManager<M> getOutgoingMessageManager() {
        return (OutgoingMessageManager) ReflectionUtils.newInstance(this.conf.getClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS, OutgoingPOJOMessageBundle.class, OutgoingMessageManager.class));
    }

    protected MessageQueue<M> getReceiverQueue() {
        MessageQueue<M> messageQueue = (MessageQueue) ReflectionUtils.newInstance(this.conf.getClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class, MessageQueue.class));
        messageQueue.init(this.conf, this.attemptId);
        return messageQueue;
    }

    protected SynchronizedQueue<M> getSynchronizedReceiverQueue() {
        return SingleLockQueue.synchronize(getReceiverQueue());
    }

    public final Configuration getConf() {
        return this.conf;
    }

    public final void setConf(Configuration configuration) {
        this.conf = configuration;
    }

    private void notifySentMessage(String str, M m) {
        Iterator<MessageEventListener<M>> it = this.messageListenerQueue.iterator();
        while (it.hasNext()) {
            it.next().onMessageSent(str, m);
        }
    }

    private void notifyReceivedMessage(M m) throws IOException {
        Iterator<MessageEventListener<M>> it = this.messageListenerQueue.iterator();
        while (it.hasNext()) {
            it.next().onMessageReceived(m);
        }
    }

    private void notifyReceivedMessage(BSPMessageBundle<M> bSPMessageBundle) throws IOException {
        Iterator<MessageEventListener<M>> it = this.messageListenerQueue.iterator();
        while (it.hasNext()) {
            it.next().onBundleReceived(bSPMessageBundle);
        }
    }

    private void notifyInit() {
        Iterator<MessageEventListener<M>> it = this.messageListenerQueue.iterator();
        while (it.hasNext()) {
            it.next().onInitialized();
        }
    }

    private void notifyClose() {
        Iterator<MessageEventListener<M>> it = this.messageListenerQueue.iterator();
        while (it.hasNext()) {
            it.next().onClose();
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public void registerListener(MessageEventListener<M> messageEventListener) throws IOException {
        if (messageEventListener != null) {
            this.messageListenerQueue.add(messageEventListener);
        }
    }

    @Override // org.apache.hama.bsp.message.MessageManager
    public void loopBackBundle(BSPMessageBundle<M> bSPMessageBundle) throws IOException {
        this.peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, bSPMessageBundle.size());
        this.localQueueForNextIteration.addBundle(bSPMessageBundle);
        notifyReceivedMessage((BSPMessageBundle) bSPMessageBundle);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.hama.bsp.message.MessageManager
    public void loopBackMessage(Writable writable) throws IOException {
        this.localQueueForNextIteration.add(writable);
        notifyReceivedMessage((AbstractMessageManager<M>) writable);
    }
}
