package org.apache.reef.wake.impl;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.AbstractEStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.StageConfiguration;

/* loaded from: input_file:org/apache/reef/wake/impl/SingleThreadStage.class */
public final class SingleThreadStage<T> extends AbstractEStage<T> {
    private static final Logger LOG = Logger.getLogger(SingleThreadStage.class.getName());
    private final BlockingQueue<T> queue;
    private final Thread thread;
    private final AtomicBoolean interrupted;

    /* loaded from: input_file:org/apache/reef/wake/impl/SingleThreadStage$Producer.class */
    private class Producer<U> implements Runnable {
        private final String name;
        private final BlockingQueue<U> queue;
        private final EventHandler<U> handler;
        private final AtomicBoolean interrupted;

        Producer(String str, BlockingQueue<U> blockingQueue, EventHandler<U> eventHandler, AtomicBoolean atomicBoolean) {
            this.name = str;
            this.queue = blockingQueue;
            this.handler = eventHandler;
            this.interrupted = atomicBoolean;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    this.handler.onNext(this.queue.take());
                    SingleThreadStage.this.afterOnNext();
                } catch (InterruptedException e) {
                    if (this.interrupted.get()) {
                        SingleThreadStage.LOG.log(Level.FINEST, this.name + " Closing Producer due to interruption");
                        return;
                    }
                } catch (Exception e2) {
                    SingleThreadStage.LOG.log(Level.SEVERE, this.name + " Exception from event handler", (Throwable) e2);
                    throw e2;
                }
            }
        }
    }

    @Inject
    public SingleThreadStage(@Parameter(StageConfiguration.StageHandler.class) EventHandler<T> eventHandler, @Parameter(StageConfiguration.Capacity.class) int i) {
        this(eventHandler.getClass().getName(), eventHandler, i);
    }

    @Inject
    public SingleThreadStage(@Parameter(StageConfiguration.StageName.class) String str, @Parameter(StageConfiguration.StageHandler.class) EventHandler<T> eventHandler, @Parameter(StageConfiguration.Capacity.class) int i) {
        super(str);
        this.queue = new ArrayBlockingQueue(i);
        this.interrupted = new AtomicBoolean(false);
        this.thread = new Thread(new Producer(str, this.queue, eventHandler, this.interrupted));
        this.thread.setName("SingleThreadStage<" + str + ">");
        this.thread.start();
        StageManager.instance().register(this);
    }

    @Override // org.apache.reef.wake.EventHandler
    public void onNext(T t) {
        beforeOnNext();
        this.queue.add(t);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.closed.compareAndSet(false, true)) {
            this.interrupted.set(true);
            this.thread.interrupt();
        }
    }
}
