package org.apache.reef.wake.remote.impl;

import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.WakeParameters;
import org.apache.reef.wake.impl.DefaultThreadFactory;
import org.apache.reef.wake.impl.ThreadPoolStage;
import org.apache.reef.wake.remote.exception.RemoteRuntimeException;

/* loaded from: input_file:org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.class */
public class OrderedRemoteReceiverStage implements EStage<TransportEvent> {
    private static final Logger LOG = Logger.getLogger(OrderedRemoteReceiverStage.class.getName());
    private final long shutdownTimeout = WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT;
    private final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap = new ConcurrentHashMap();
    private final ExecutorService pushExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory(OrderedRemoteReceiverStage.class.getName() + "_Push"));
    private final ExecutorService pullExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory(OrderedRemoteReceiverStage.class.getName() + "_Pull"));
    private final ThreadPoolStage<TransportEvent> pushStage;
    private final ThreadPoolStage<OrderedEventStream> pullStage;

    public OrderedRemoteReceiverStage(EventHandler<RemoteEvent<byte[]>> eventHandler, EventHandler<Throwable> eventHandler2) {
        this.pullStage = new ThreadPoolStage<>(new OrderedPullEventHandler(eventHandler), this.pullExecutor, eventHandler2);
        this.pushStage = new ThreadPoolStage<>(new OrderedPushEventHandler(this.streamMap, this.pullStage), this.pushExecutor, eventHandler2);
    }

    @Override // org.apache.reef.wake.EventHandler
    public void onNext(TransportEvent transportEvent) {
        LOG.log(Level.FINEST, "{0}", transportEvent);
        this.pushStage.onNext(transportEvent);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.log(Level.FINE, "close");
        if (this.pushExecutor != null) {
            this.pushExecutor.shutdown();
            try {
                if (!this.pushExecutor.awaitTermination(WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    LOG.log(Level.WARNING, "Executor did not terminate in 10000ms.");
                    LOG.log(Level.WARNING, "Executor dropped " + this.pushExecutor.shutdownNow().size() + " tasks.");
                }
            } catch (InterruptedException e) {
                LOG.log(Level.WARNING, "Close interrupted");
                throw new RemoteRuntimeException(e);
            }
        }
        if (this.pullExecutor != null) {
            this.pullExecutor.shutdown();
            try {
                if (!this.pullExecutor.awaitTermination(WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    LOG.log(Level.WARNING, "Executor did not terminate in 10000ms.");
                    LOG.log(Level.WARNING, "Executor dropped " + this.pullExecutor.shutdownNow().size() + " tasks.");
                }
            } catch (InterruptedException e2) {
                LOG.log(Level.WARNING, "Close interrupted");
                throw new RemoteRuntimeException(e2);
            }
        }
    }
}
