package org.apache.reef.wake.examples.p2p;

import java.util.LinkedList;
import java.util.Queue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.reef.wake.EventHandler;

/* loaded from: input_file:org/apache/reef/wake/examples/p2p/Pull2Push.class */
public final class Pull2Push<T> implements Runnable, AutoCloseable {
    private final EventHandler<T> output;
    private final Queue<EventSource<T>> sources = new LinkedList();
    private boolean closed = false;

    public Pull2Push(EventHandler<T> eventHandler) {
        this.output = eventHandler;
    }

    public void register(EventSource<T> eventSource) {
        this.sources.add(eventSource);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.closed) {
            EventSource<T> poll = this.sources.poll();
            if (null != poll) {
                T next = poll.getNext();
                if (null != next) {
                    this.sources.add(poll);
                    this.output.onNext(next);
                } else {
                    Logger.getLogger(Pull2Push.class.getName()).log(Level.INFO, "Droping message source {0} from the queue", poll.toString());
                }
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.closed = true;
    }
}
