/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.ServerFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerators;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.function.ThrowingFunction;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheBuilder;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheLoader;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LoadingCache;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class DefaultJobBundleFactory
implements JobBundleFactory {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultJobBundleFactory.class);
    private final IdGenerator stageIdGenerator;
    private final LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient> environmentCache;
    private RunnerApi.Environment environment;
    private ExecutorService executor;
    private GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private GrpcFnServer<GrpcLoggingService> loggingServer;
    private GrpcFnServer<ArtifactRetrievalService> retrievalServer;
    private GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
    private GrpcFnServer<GrpcDataService> dataServer;
    private GrpcFnServer<GrpcStateService> stateServer;
    private MapControlClientPool clientPool;
    private EnvironmentFactory environmentFactory;

    public static DefaultJobBundleFactory create(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap) {
        return new DefaultJobBundleFactory(jobInfo, environmentFactoryProviderMap);
    }

    DefaultJobBundleFactory(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryMap) {
        IdGenerator stageIdGenerator;
        this.stageIdGenerator = stageIdGenerator = IdGenerators.incrementingLongs();
        this.environmentCache = this.createEnvironmentCache(environment -> {
            DefaultJobBundleFactory defaultJobBundleFactory = this;
            synchronized (defaultJobBundleFactory) {
                this.checkAndInitialize(jobInfo, environmentFactoryMap, (RunnerApi.Environment)environment);
            }
            return this.environmentFactory.createEnvironment((RunnerApi.Environment)environment);
        });
    }

    @VisibleForTesting
    DefaultJobBundleFactory(EnvironmentFactory environmentFactory, IdGenerator stageIdGenerator, GrpcFnServer<FnApiControlClientPoolService> controlServer, GrpcFnServer<GrpcLoggingService> loggingServer, GrpcFnServer<ArtifactRetrievalService> retrievalServer, GrpcFnServer<StaticGrpcProvisionService> provisioningServer, GrpcFnServer<GrpcDataService> dataServer, GrpcFnServer<GrpcStateService> stateServer) throws Exception {
        this.executor = Executors.newCachedThreadPool();
        this.stageIdGenerator = stageIdGenerator;
        this.controlServer = controlServer;
        this.loggingServer = loggingServer;
        this.retrievalServer = retrievalServer;
        this.provisioningServer = provisioningServer;
        this.dataServer = dataServer;
        this.stateServer = stateServer;
        this.environmentCache = this.createEnvironmentCache(env -> environmentFactory.createEnvironment((RunnerApi.Environment)env));
    }

    private LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient> createEnvironmentCache(final ThrowingFunction<RunnerApi.Environment, RemoteEnvironment> environmentCreator) {
        return CacheBuilder.newBuilder().removalListener(notification -> {
            LOG.debug("Cleaning up for environment {}", (Object)((RunnerApi.Environment)notification.getKey()).getUrn());
            try {
                ((WrappedSdkHarnessClient)notification.getValue()).close();
            }
            catch (Exception e) {
                LOG.warn(String.format("Error cleaning up environment %s", notification.getKey()), (Throwable)e);
            }
        }).build(new CacheLoader<RunnerApi.Environment, WrappedSdkHarnessClient>(){

            @Override
            public WrappedSdkHarnessClient load(RunnerApi.Environment environment) throws Exception {
                return WrappedSdkHarnessClient.wrapping((RemoteEnvironment)environmentCreator.apply(environment), DefaultJobBundleFactory.this.dataServer);
            }
        });
    }

    @Override
    public StageBundleFactory forStage(ExecutableStage executableStage) {
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        WrappedSdkHarnessClient wrappedClient = this.environmentCache.getUnchecked(executableStage.getEnvironment());
        try {
            processBundleDescriptor = ProcessBundleDescriptors.fromExecutableStage(this.stageIdGenerator.getId(), executableStage, this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return SimpleStageBundleFactory.create(wrappedClient, processBundleDescriptor, this.stateServer);
    }

    @Override
    public void close() throws Exception {
        this.environmentCache.invalidateAll();
        this.environmentCache.cleanUp();
        this.stateServer.close();
        this.dataServer.close();
        this.controlServer.close();
        this.loggingServer.close();
        this.retrievalServer.close();
        this.provisioningServer.close();
        this.executor.shutdown();
    }

    @GuardedBy(value="this")
    private void checkAndInitialize(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap, RunnerApi.Environment environment) throws IOException {
        Preconditions.checkNotNull(environment, "Environment can not be null");
        if (this.environment != null) {
            Preconditions.checkArgument(this.environment.getUrn().equals(environment.getUrn()), "Unsupported: Mixing environment types (%s, %s) is not supported for a job.", (Object)this.environment.getUrn(), (Object)environment.getUrn());
            return;
        }
        EnvironmentFactory.Provider environmentFactoryProvider = environmentFactoryProviderMap.get(environment.getUrn());
        ServerFactory serverFactory = environmentFactoryProvider.getServerFactory();
        this.clientPool = MapControlClientPool.create();
        this.executor = Executors.newCachedThreadPool();
        this.controlServer = GrpcFnServer.allocatePortAndCreateFor(FnApiControlClientPoolService.offeringClientsToPool(this.clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), serverFactory);
        this.loggingServer = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
        this.retrievalServer = GrpcFnServer.allocatePortAndCreateFor(BeamFileSystemArtifactRetrievalService.create(), serverFactory);
        this.provisioningServer = GrpcFnServer.allocatePortAndCreateFor(StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory);
        this.dataServer = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(this.executor, OutboundObserverFactory.serverDirect()), serverFactory);
        this.stateServer = GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);
        this.environmentFactory = environmentFactoryProvider.createEnvironmentFactory(this.controlServer, this.loggingServer, this.retrievalServer, this.provisioningServer, this.clientPool, this.stageIdGenerator);
        this.environment = environment;
    }

    protected static class WrappedSdkHarnessClient
    implements AutoCloseable {
        private final RemoteEnvironment environment;
        private final SdkHarnessClient client;

        static WrappedSdkHarnessClient wrapping(RemoteEnvironment environment, GrpcFnServer<GrpcDataService> dataServer) {
            SdkHarnessClient client = SdkHarnessClient.usingFnApiClient(environment.getInstructionRequestHandler(), dataServer.getService());
            return new WrappedSdkHarnessClient(environment, client);
        }

        private WrappedSdkHarnessClient(RemoteEnvironment environment, SdkHarnessClient client) {
            this.environment = environment;
            this.client = client;
        }

        SdkHarnessClient getClient() {
            return this.client;
        }

        @Override
        public void close() throws Exception {
            RemoteEnvironment envCloser = this.environment;
            Throwable throwable = null;
            if (envCloser != null) {
                if (throwable != null) {
                    try {
                        envCloser.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    envCloser.close();
                }
            }
        }
    }

    protected static class SimpleStageBundleFactory
    implements StageBundleFactory {
        private final SdkHarnessClient.BundleProcessor processor;
        private final ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        @SuppressFBWarnings
        private WrappedSdkHarnessClient wrappedClient;

        static SimpleStageBundleFactory create(WrappedSdkHarnessClient wrappedClient, ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor, GrpcFnServer<GrpcStateService> stateServer) {
            SdkHarnessClient.BundleProcessor processor = wrappedClient.getClient().getProcessor(processBundleDescriptor.getProcessBundleDescriptor(), processBundleDescriptor.getRemoteInputDestinations(), stateServer.getService());
            return new SimpleStageBundleFactory(processBundleDescriptor, processor, wrappedClient);
        }

        SimpleStageBundleFactory(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor, SdkHarnessClient.BundleProcessor processor, WrappedSdkHarnessClient wrappedClient) {
            this.processBundleDescriptor = processBundleDescriptor;
            this.processor = processor;
            this.wrappedClient = wrappedClient;
        }

        @Override
        public RemoteBundle getBundle(OutputReceiverFactory outputReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler) throws Exception {
            ImmutableMap.Builder<BeamFnApi.Target, RemoteOutputReceiver> outputReceivers = ImmutableMap.builder();
            for (Map.Entry<BeamFnApi.Target, Coder<WindowedValue<?>>> targetCoder : this.processBundleDescriptor.getOutputTargetCoders().entrySet()) {
                BeamFnApi.Target target = targetCoder.getKey();
                Coder<WindowedValue<?>> coder = targetCoder.getValue();
                String bundleOutputPCollection = (String)Iterables.getOnlyElement(this.processBundleDescriptor.getProcessBundleDescriptor().getTransformsOrThrow(target.getPrimitiveTransformReference()).getInputsMap().values());
                FnDataReceiver outputReceiver = outputReceiverFactory.create(bundleOutputPCollection);
                outputReceivers.put(target, RemoteOutputReceiver.of(coder, outputReceiver));
            }
            return this.processor.newBundle(outputReceivers.build(), stateRequestHandler, progressHandler);
        }

        @Override
        public ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
            return this.processBundleDescriptor;
        }

        @Override
        public void close() throws Exception {
            this.wrappedClient = null;
        }
    }
}

