/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.base.Optional;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.CacheBuilder;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.CacheLoader;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.LoadingCache;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.RemovalListener;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.ExecutionDriver;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.PipelineMessageReceiver;
import org.apache.beam.runners.direct.BundleProcessor;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.CompletionCallback;
import org.apache.beam.runners.direct.DirectGraph;
import org.apache.beam.runners.direct.DirectTransformExecutor;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ModelEnforcementFactory;
import org.apache.beam.runners.direct.PipelineExecutor;
import org.apache.beam.runners.direct.QuiescenceDriver;
import org.apache.beam.runners.direct.RootProviderRegistry;
import org.apache.beam.runners.direct.StepAndKey;
import org.apache.beam.runners.direct.TransformEvaluatorRegistry;
import org.apache.beam.runners.direct.TransformExecutor;
import org.apache.beam.runners.direct.TransformExecutorFactory;
import org.apache.beam.runners.direct.TransformExecutorService;
import org.apache.beam.runners.direct.TransformExecutorServices;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ExecutorServiceParallelExecutor
implements PipelineExecutor,
BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, ?, ?>> {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
    private final int targetParallelism;
    private final ExecutorService executorService;
    private final TransformEvaluatorRegistry registry;
    private final EvaluationContext evaluationContext;
    private final TransformExecutorFactory executorFactory;
    private final TransformExecutorService parallelExecutorService;
    private final LoadingCache<StepAndKey, TransformExecutorService> serialExecutorServices;
    private final QueueMessageReceiver visibleUpdates;
    private final ExecutorService metricsExecutor;
    private AtomicReference<PipelineResult.State> pipelineState = new AtomicReference<PipelineResult.State>(PipelineResult.State.RUNNING);

    public static ExecutorServiceParallelExecutor create(int targetParallelism, TransformEvaluatorRegistry registry, Map<String, Collection<ModelEnforcementFactory>> transformEnforcements, EvaluationContext context, ExecutorService metricsExecutor) {
        return new ExecutorServiceParallelExecutor(targetParallelism, registry, transformEnforcements, context, metricsExecutor);
    }

    private ExecutorServiceParallelExecutor(int targetParallelism, TransformEvaluatorRegistry registry, Map<String, Collection<ModelEnforcementFactory>> transformEnforcements, EvaluationContext context, ExecutorService metricsExecutor) {
        this.targetParallelism = targetParallelism;
        this.metricsExecutor = metricsExecutor;
        this.executorService = Executors.newFixedThreadPool(targetParallelism, new ThreadFactoryBuilder().setThreadFactory(MoreExecutors.platformThreadFactory()).setNameFormat("direct-runner-worker").build());
        this.registry = registry;
        this.evaluationContext = context;
        this.serialExecutorServices = CacheBuilder.newBuilder().weakValues().removalListener(this.shutdownExecutorServiceListener()).build(this.serialTransformExecutorServiceCacheLoader());
        this.visibleUpdates = new QueueMessageReceiver();
        this.parallelExecutorService = TransformExecutorServices.parallel(this.executorService);
        this.executorFactory = new DirectTransformExecutor.Factory(context, registry, transformEnforcements);
    }

    private CacheLoader<StepAndKey, TransformExecutorService> serialTransformExecutorServiceCacheLoader() {
        return new CacheLoader<StepAndKey, TransformExecutorService>(){

            @Override
            public TransformExecutorService load(StepAndKey stepAndKey) throws Exception {
                return TransformExecutorServices.serial(ExecutorServiceParallelExecutor.this.executorService);
            }
        };
    }

    private RemovalListener<StepAndKey, TransformExecutorService> shutdownExecutorServiceListener() {
        return notification -> {
            TransformExecutorService service = (TransformExecutorService)notification.getValue();
            if (service != null) {
                service.shutdown();
            }
        };
    }

    @Override
    public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry) {
        int numTargetSplits = Math.max(3, this.targetParallelism);
        ImmutableMap.Builder pendingRootBundles = ImmutableMap.builder();
        for (AppliedPTransform root : graph.getRootTransforms()) {
            ConcurrentLinkedQueue pending = new ConcurrentLinkedQueue();
            try {
                Collection<CommittedBundle<?>> initialInputs = rootProviderRegistry.getInitialInputs(root, numTargetSplits);
                pending.addAll(initialInputs);
            }
            catch (Exception e) {
                throw UserCodeException.wrap((Throwable)e);
            }
            pendingRootBundles.put(root, pending);
        }
        this.evaluationContext.initialize(pendingRootBundles.build());
        final ExecutionDriver executionDriver = QuiescenceDriver.create(this.evaluationContext, graph, this, this.visibleUpdates, pendingRootBundles.build());
        this.executorService.submit(new Runnable(){

            @Override
            public void run() {
                ExecutionDriver.DriverState drive = executionDriver.drive();
                if (drive.isTermainal()) {
                    PipelineResult.State newPipelineState = PipelineResult.State.UNKNOWN;
                    switch (drive) {
                        case FAILED: {
                            newPipelineState = PipelineResult.State.FAILED;
                            break;
                        }
                        case SHUTDOWN: {
                            newPipelineState = PipelineResult.State.DONE;
                            break;
                        }
                        case CONTINUE: {
                            throw new IllegalStateException(String.format("%s should not be a terminal state", new Object[]{ExecutionDriver.DriverState.CONTINUE}));
                        }
                        default: {
                            throw new IllegalArgumentException(String.format("Unknown %s %s", new Object[]{ExecutionDriver.DriverState.class.getSimpleName(), drive}));
                        }
                    }
                    ExecutorServiceParallelExecutor.this.shutdownIfNecessary(newPipelineState);
                } else {
                    ExecutorServiceParallelExecutor.this.executorService.submit(this);
                }
            }
        });
    }

    @Override
    public void process(CommittedBundle<?> bundle, AppliedPTransform<?, ?, ?> consumer, CompletionCallback onComplete) {
        this.evaluateBundle(consumer, bundle, onComplete);
    }

    private <T> void evaluateBundle(AppliedPTransform<?, ?, ?> transform, CommittedBundle<T> bundle, CompletionCallback onComplete) {
        TransformExecutorService transformExecutor;
        if (this.isKeyed((PValue)bundle.getPCollection())) {
            StepAndKey stepAndKey = StepAndKey.of(transform, bundle.getKey());
            transformExecutor = this.serialExecutorServices.getUnchecked(stepAndKey);
        } else {
            transformExecutor = this.parallelExecutorService;
        }
        TransformExecutor callable = this.executorFactory.create(bundle, transform, onComplete, transformExecutor);
        if (!this.pipelineState.get().isTerminal()) {
            transformExecutor.schedule(callable);
        }
    }

    private boolean isKeyed(PValue pvalue) {
        return this.evaluationContext.isKeyed(pvalue);
    }

    @Override
    public PipelineResult.State waitUntilFinish(Duration duration) throws Exception {
        Instant completionTime = duration.equals((Object)Duration.ZERO) ? new Instant(Long.MAX_VALUE) : Instant.now().plus((ReadableDuration)duration);
        VisibleExecutorUpdate update = null;
        while (Instant.now().isBefore((ReadableInstant)completionTime) && (update == null || this.isTerminalStateUpdate(update))) {
            update = this.visibleUpdates.tryNext(Duration.millis((long)25L));
            if (update == null && this.pipelineState.get().isTerminal()) {
                return this.pipelineState.get();
            }
            if (update == null || !update.thrown.isPresent()) continue;
            Throwable thrown = (Throwable)update.thrown.get();
            if (thrown instanceof Exception) {
                throw (Exception)thrown;
            }
            if (thrown instanceof Error) {
                throw (Error)thrown;
            }
            throw new Exception("Unknown Type of Throwable", thrown);
        }
        return this.pipelineState.get();
    }

    @Override
    public PipelineResult.State getPipelineState() {
        return this.pipelineState.get();
    }

    private boolean isTerminalStateUpdate(VisibleExecutorUpdate update) {
        return update.getNewState() != null || !update.getNewState().isTerminal();
    }

    @Override
    public void stop() {
        this.shutdownIfNecessary(PipelineResult.State.CANCELLED);
        this.visibleUpdates.cancelled();
    }

    private void shutdownIfNecessary(PipelineResult.State newState) {
        if (!newState.isTerminal()) {
            return;
        }
        LOG.debug("Pipeline has terminated. Shutting down.");
        ArrayList<Exception> errors = new ArrayList<Exception>();
        try {
            this.serialExecutorServices.invalidateAll();
        }
        catch (RuntimeException re) {
            errors.add(re);
        }
        try {
            this.serialExecutorServices.cleanUp();
        }
        catch (RuntimeException re) {
            errors.add(re);
        }
        try {
            this.parallelExecutorService.shutdown();
        }
        catch (RuntimeException re) {
            errors.add(re);
        }
        try {
            this.executorService.shutdown();
        }
        catch (RuntimeException re) {
            errors.add(re);
        }
        try {
            this.metricsExecutor.shutdown();
        }
        catch (RuntimeException re) {
            errors.add(re);
        }
        try {
            this.registry.cleanup();
        }
        catch (Exception e) {
            errors.add(e);
        }
        this.pipelineState.compareAndSet(PipelineResult.State.RUNNING, newState);
        if (!errors.isEmpty()) {
            IllegalStateException exception = new IllegalStateException("Error" + (errors.size() == 1 ? "" : "s") + " during executor shutdown:\n" + errors.stream().map(Throwable::getMessage).collect(Collectors.joining("\n- ", "- ", "")));
            this.visibleUpdates.failed(exception);
            throw exception;
        }
    }

    private static class QueueMessageReceiver
    implements PipelineMessageReceiver {
        private final BlockingQueue<VisibleExecutorUpdate> updates = new LinkedBlockingQueue<VisibleExecutorUpdate>();

        private QueueMessageReceiver() {
        }

        @Override
        public void failed(Exception e) {
            this.updates.offer(VisibleExecutorUpdate.fromException(e));
        }

        @Override
        public void failed(Error e) {
            this.updates.offer(VisibleExecutorUpdate.fromError(e));
        }

        @Override
        public void cancelled() {
            this.updates.offer(VisibleExecutorUpdate.cancelled());
        }

        @Override
        public void completed() {
            this.updates.offer(VisibleExecutorUpdate.finished());
        }

        @Nullable
        private VisibleExecutorUpdate tryNext(Duration timeout) throws InterruptedException {
            return this.updates.poll(timeout.getMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private static class VisibleExecutorUpdate {
        private final Optional<? extends Throwable> thrown;
        @Nullable
        private final PipelineResult.State newState;

        public static VisibleExecutorUpdate fromException(Exception e) {
            return new VisibleExecutorUpdate(null, e);
        }

        public static VisibleExecutorUpdate fromError(Error err) {
            return new VisibleExecutorUpdate(PipelineResult.State.FAILED, err);
        }

        public static VisibleExecutorUpdate finished() {
            return new VisibleExecutorUpdate(PipelineResult.State.DONE, null);
        }

        public static VisibleExecutorUpdate cancelled() {
            return new VisibleExecutorUpdate(PipelineResult.State.CANCELLED, null);
        }

        private VisibleExecutorUpdate(PipelineResult.State newState, @Nullable Throwable exception) {
            this.thrown = Optional.fromNullable(exception);
            this.newState = newState;
        }

        PipelineResult.State getNewState() {
            return this.newState;
        }
    }
}

