package org.apache.reef.wake.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.AbstractEStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.StageConfiguration;
import org.apache.reef.wake.exception.WakeRuntimeException;

/* loaded from: input_file:org/apache/reef/wake/impl/ThreadPoolStage.class */
public final class ThreadPoolStage<T> extends AbstractEStage<T> {
    private static final Logger LOG = Logger.getLogger(ThreadPoolStage.class.getName());
    private final EventHandler<T> handler;
    private final ExecutorService executor;
    private final int numThreads;
    private final long shutdownTimeout = 1000;
    private final EventHandler<Throwable> errorHandler;

    @Inject
    public ThreadPoolStage(@Parameter(StageConfiguration.StageHandler.class) EventHandler<T> eventHandler, @Parameter(StageConfiguration.NumberOfThreads.class) int i) {
        this(eventHandler.getClass().getName(), eventHandler, i, (EventHandler<Throwable>) null);
    }

    @Inject
    public ThreadPoolStage(@Parameter(StageConfiguration.StageName.class) String str, @Parameter(StageConfiguration.StageHandler.class) EventHandler<T> eventHandler, @Parameter(StageConfiguration.NumberOfThreads.class) int i, @Parameter(StageConfiguration.ErrorHandler.class) EventHandler<Throwable> eventHandler2) {
        super(str);
        this.shutdownTimeout = 1000L;
        this.handler = eventHandler;
        this.errorHandler = eventHandler2;
        if (i <= 0) {
            throw new WakeRuntimeException(str + " numThreads " + i + " is less than or equal to 0");
        }
        this.numThreads = i;
        this.executor = Executors.newFixedThreadPool(i, new DefaultThreadFactory(str));
        StageManager.instance().register(this);
    }

    @Inject
    public ThreadPoolStage(@Parameter(StageConfiguration.StageName.class) String str, @Parameter(StageConfiguration.StageHandler.class) EventHandler<T> eventHandler, @Parameter(StageConfiguration.NumberOfThreads.class) int i) {
        this(str, eventHandler, i, (EventHandler<Throwable>) null);
    }

    @Inject
    public ThreadPoolStage(@Parameter(StageConfiguration.StageHandler.class) EventHandler<T> eventHandler, @Parameter(StageConfiguration.StageExecutorService.class) ExecutorService executorService) {
        this(eventHandler.getClass().getName(), eventHandler, executorService);
    }

    @Inject
    public ThreadPoolStage(@Parameter(StageConfiguration.StageHandler.class) EventHandler<T> eventHandler, @Parameter(StageConfiguration.StageExecutorService.class) ExecutorService executorService, @Parameter(StageConfiguration.ErrorHandler.class) EventHandler<Throwable> eventHandler2) {
        this(eventHandler.getClass().getName(), eventHandler, executorService, eventHandler2);
    }

    @Inject
    public ThreadPoolStage(@Parameter(StageConfiguration.StageName.class) String str, @Parameter(StageConfiguration.StageHandler.class) EventHandler<T> eventHandler, @Parameter(StageConfiguration.StageExecutorService.class) ExecutorService executorService) {
        this(str, eventHandler, executorService, (EventHandler<Throwable>) null);
    }

    @Inject
    public ThreadPoolStage(@Parameter(StageConfiguration.StageName.class) String str, @Parameter(StageConfiguration.StageHandler.class) EventHandler<T> eventHandler, @Parameter(StageConfiguration.StageExecutorService.class) ExecutorService executorService, @Parameter(StageConfiguration.ErrorHandler.class) EventHandler<Throwable> eventHandler2) {
        super(str);
        this.shutdownTimeout = 1000L;
        this.handler = eventHandler;
        this.errorHandler = eventHandler2;
        this.numThreads = 0;
        this.executor = executorService;
        StageManager.instance().register(this);
    }

    @Override // org.apache.reef.wake.EventHandler
    public void onNext(final T t) {
        beforeOnNext();
        try {
            this.executor.submit(new Runnable() { // from class: org.apache.reef.wake.impl.ThreadPoolStage.1
                /* JADX WARN: Multi-variable type inference failed */
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            ThreadPoolStage.this.handler.onNext(t);
                            ThreadPoolStage.this.afterOnNext();
                        } catch (Throwable th) {
                            if (ThreadPoolStage.this.errorHandler == null) {
                                ThreadPoolStage.LOG.log(Level.SEVERE, ThreadPoolStage.this.name + " Exception from event handler", th);
                                throw th;
                            }
                            ThreadPoolStage.this.errorHandler.onNext(th);
                            ThreadPoolStage.this.afterOnNext();
                        }
                    } catch (Throwable th2) {
                        ThreadPoolStage.this.afterOnNext();
                        throw th2;
                    }
                }
            });
        } catch (Exception e) {
            LOG.log(Level.SEVERE, "Encountered error when submitting to executor in ThreadPoolStage.");
            afterOnNext();
            throw e;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (!this.closed.compareAndSet(false, true) || this.numThreads <= 0) {
            return;
        }
        this.executor.shutdown();
        if (this.executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
            return;
        }
        LOG.log(Level.WARNING, "Executor did not terminate in 1000ms.");
        LOG.log(Level.WARNING, "Executor dropped " + this.executor.shutdownNow().size() + " tasks.");
    }

    public int getQueueLength() {
        return ((ThreadPoolExecutor) this.executor).getQueue().size();
    }

    public int getActiveCount() {
        return (int) (getInMeter().getCount() - getOutMeter().getCount());
    }
}
