package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.ServerFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerators;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.function.ThrowingFunction;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheBuilder;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheLoader;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LoadingCache;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/control/DefaultJobBundleFactory.class */
public class DefaultJobBundleFactory implements JobBundleFactory {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultJobBundleFactory.class);
    private final IdGenerator stageIdGenerator;
    private final LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient> environmentCache;
    private RunnerApi.Environment environment;
    private ExecutorService executor;
    private GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private GrpcFnServer<GrpcLoggingService> loggingServer;
    private GrpcFnServer<ArtifactRetrievalService> retrievalServer;
    private GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
    private GrpcFnServer<GrpcDataService> dataServer;
    private GrpcFnServer<GrpcStateService> stateServer;
    private MapControlClientPool clientPool;
    private EnvironmentFactory environmentFactory;

    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/control/DefaultJobBundleFactory$SimpleStageBundleFactory.class */
    protected static class SimpleStageBundleFactory implements StageBundleFactory {
        private final SdkHarnessClient.BundleProcessor processor;
        private final ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;

        @SuppressFBWarnings
        private WrappedSdkHarnessClient wrappedClient;

        static SimpleStageBundleFactory create(WrappedSdkHarnessClient wrappedSdkHarnessClient, ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor, GrpcFnServer<GrpcStateService> grpcFnServer) {
            return new SimpleStageBundleFactory(executableProcessBundleDescriptor, wrappedSdkHarnessClient.getClient().getProcessor(executableProcessBundleDescriptor.getProcessBundleDescriptor(), executableProcessBundleDescriptor.getRemoteInputDestinations(), grpcFnServer.getService()), wrappedSdkHarnessClient);
        }

        SimpleStageBundleFactory(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor, SdkHarnessClient.BundleProcessor bundleProcessor, WrappedSdkHarnessClient wrappedSdkHarnessClient) {
            this.processBundleDescriptor = executableProcessBundleDescriptor;
            this.processor = bundleProcessor;
            this.wrappedClient = wrappedSdkHarnessClient;
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.StageBundleFactory
        public RemoteBundle getBundle(OutputReceiverFactory outputReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler bundleProgressHandler) throws Exception {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (Map.Entry<BeamFnApi.Target, Coder<WindowedValue<?>>> entry : this.processBundleDescriptor.getOutputTargetCoders().entrySet()) {
                BeamFnApi.Target key = entry.getKey();
                builder.put(key, RemoteOutputReceiver.of(entry.getValue(), outputReceiverFactory.create((String) Iterables.getOnlyElement(this.processBundleDescriptor.getProcessBundleDescriptor().getTransformsOrThrow(key.getPrimitiveTransformReference()).getInputsMap().values()))));
            }
            return this.processor.newBundle(builder.build(), stateRequestHandler, bundleProgressHandler);
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.wrappedClient = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/control/DefaultJobBundleFactory$WrappedSdkHarnessClient.class */
    public static class WrappedSdkHarnessClient implements AutoCloseable {
        private final RemoteEnvironment environment;
        private final SdkHarnessClient client;

        static WrappedSdkHarnessClient wrapping(RemoteEnvironment remoteEnvironment, GrpcFnServer<GrpcDataService> grpcFnServer) {
            return new WrappedSdkHarnessClient(remoteEnvironment, SdkHarnessClient.usingFnApiClient(remoteEnvironment.getInstructionRequestHandler(), grpcFnServer.getService()));
        }

        private WrappedSdkHarnessClient(RemoteEnvironment remoteEnvironment, SdkHarnessClient sdkHarnessClient) {
            this.environment = remoteEnvironment;
            this.client = sdkHarnessClient;
        }

        SdkHarnessClient getClient() {
            return this.client;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            RemoteEnvironment remoteEnvironment = this.environment;
            Throwable th = null;
            if (remoteEnvironment != null) {
                if (0 == 0) {
                    remoteEnvironment.close();
                    return;
                }
                try {
                    remoteEnvironment.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        }
    }

    public static DefaultJobBundleFactory create(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> map) {
        return new DefaultJobBundleFactory(jobInfo, map);
    }

    DefaultJobBundleFactory(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> map) {
        this.stageIdGenerator = IdGenerators.incrementingLongs();
        this.environmentCache = createEnvironmentCache(environment -> {
            synchronized (this) {
                checkAndInitialize(jobInfo, map, environment);
            }
            return this.environmentFactory.createEnvironment(environment);
        });
    }

    @VisibleForTesting
    DefaultJobBundleFactory(EnvironmentFactory environmentFactory, IdGenerator idGenerator, GrpcFnServer<FnApiControlClientPoolService> grpcFnServer, GrpcFnServer<GrpcLoggingService> grpcFnServer2, GrpcFnServer<ArtifactRetrievalService> grpcFnServer3, GrpcFnServer<StaticGrpcProvisionService> grpcFnServer4, GrpcFnServer<GrpcDataService> grpcFnServer5, GrpcFnServer<GrpcStateService> grpcFnServer6) throws Exception {
        this.executor = Executors.newCachedThreadPool();
        this.stageIdGenerator = idGenerator;
        this.controlServer = grpcFnServer;
        this.loggingServer = grpcFnServer2;
        this.retrievalServer = grpcFnServer3;
        this.provisioningServer = grpcFnServer4;
        this.dataServer = grpcFnServer5;
        this.stateServer = grpcFnServer6;
        this.environmentCache = createEnvironmentCache(environment -> {
            return environmentFactory.createEnvironment(environment);
        });
    }

    private LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient> createEnvironmentCache(final ThrowingFunction<RunnerApi.Environment, RemoteEnvironment> throwingFunction) {
        return CacheBuilder.newBuilder().removalListener(removalNotification -> {
            LOG.debug("Cleaning up for environment {}", ((RunnerApi.Environment) removalNotification.getKey()).getUrn());
            try {
                ((WrappedSdkHarnessClient) removalNotification.getValue()).close();
            } catch (Exception e) {
                LOG.warn(String.format("Error cleaning up environment %s", removalNotification.getKey()), e);
            }
        }).build(new CacheLoader<RunnerApi.Environment, WrappedSdkHarnessClient>() { // from class: org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.DefaultJobBundleFactory.1
            @Override // org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheLoader
            public WrappedSdkHarnessClient load(RunnerApi.Environment environment) throws Exception {
                return WrappedSdkHarnessClient.wrapping((RemoteEnvironment) throwingFunction.apply(environment), DefaultJobBundleFactory.this.dataServer);
            }
        });
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.JobBundleFactory
    public StageBundleFactory forStage(ExecutableStage executableStage) {
        try {
            return SimpleStageBundleFactory.create(this.environmentCache.getUnchecked(executableStage.getEnvironment()), ProcessBundleDescriptors.fromExecutableStage(this.stageIdGenerator.getId(), executableStage, this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor()), this.stateServer);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.environmentCache.invalidateAll();
        this.environmentCache.cleanUp();
        this.stateServer.close();
        this.dataServer.close();
        this.controlServer.close();
        this.loggingServer.close();
        this.retrievalServer.close();
        this.provisioningServer.close();
        this.executor.shutdown();
    }

    @GuardedBy("this")
    private void checkAndInitialize(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> map, RunnerApi.Environment environment) throws IOException {
        Preconditions.checkNotNull(environment, "Environment can not be null");
        if (this.environment != null) {
            Preconditions.checkArgument(this.environment.getUrn().equals(environment.getUrn()), "Unsupported: Mixing environment types (%s, %s) is not supported for a job.", this.environment.getUrn(), environment.getUrn());
            return;
        }
        EnvironmentFactory.Provider provider = map.get(environment.getUrn());
        ServerFactory serverFactory = provider.getServerFactory();
        this.clientPool = MapControlClientPool.create();
        this.executor = Executors.newCachedThreadPool();
        this.controlServer = GrpcFnServer.allocatePortAndCreateFor(FnApiControlClientPoolService.offeringClientsToPool(this.clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), serverFactory);
        this.loggingServer = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
        this.retrievalServer = GrpcFnServer.allocatePortAndCreateFor(BeamFileSystemArtifactRetrievalService.create(), serverFactory);
        this.provisioningServer = GrpcFnServer.allocatePortAndCreateFor(StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory);
        this.dataServer = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(this.executor, OutboundObserverFactory.serverDirect()), serverFactory);
        this.stateServer = GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);
        this.environmentFactory = provider.createEnvironmentFactory(this.controlServer, this.loggingServer, this.retrievalServer, this.provisioningServer, this.clientPool, this.stageIdGenerator);
        this.environment = environment;
    }
}
