package org.apache.reef.runtime.common.utils;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.tang.util.MonotonicHashMap;
import org.apache.reef.util.ExceptionHandlingEventHandler;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.ThreadPoolStage;

@DriverSide
@Private
/* loaded from: input_file:org/apache/reef/runtime/common/utils/DispatchingEStage.class */
public final class DispatchingEStage implements AutoCloseable {
    private final Map<Class<?>, EventHandler<?>> handlers = Collections.synchronizedMap(new MonotonicHashMap());
    private final EventHandler<Throwable> errorHandler;
    private final ThreadPoolStage<DelayedOnNext> stage;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/reef/runtime/common/utils/DispatchingEStage$DelayedOnNext.class */
    public static final class DelayedOnNext {
        private final EventHandler<Object> handler;
        private final Object message;

        <T, U extends T> DelayedOnNext(EventHandler<T> eventHandler, U u) {
            this.handler = eventHandler;
            this.message = u;
        }
    }

    public DispatchingEStage(EventHandler<Throwable> eventHandler, int i, String str) {
        this.errorHandler = eventHandler;
        this.stage = new ThreadPoolStage<>(str, new EventHandler<DelayedOnNext>() { // from class: org.apache.reef.runtime.common.utils.DispatchingEStage.1
            public void onNext(DelayedOnNext delayedOnNext) {
                delayedOnNext.handler.onNext(delayedOnNext.message);
            }
        }, i);
    }

    public DispatchingEStage(DispatchingEStage dispatchingEStage) {
        this.errorHandler = dispatchingEStage.errorHandler;
        this.stage = dispatchingEStage.stage;
    }

    public <T, U extends T> void register(Class<T> cls, Set<EventHandler<U>> set) {
        this.handlers.put(cls, new ExceptionHandlingEventHandler(new BroadCastEventHandler(set), this.errorHandler));
    }

    public <T, U extends T> void onNext(Class<T> cls, U u) {
        this.stage.onNext(new DelayedOnNext(this.handlers.get(cls), u));
    }

    public boolean isEmpty() {
        return this.stage.getQueueLength() + this.stage.getActiveCount() == 0;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.stage.close();
    }
}
