/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.ServerFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.DockerEnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerators;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheBuilder;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheLoader;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LoadingCache;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.net.HostAndPort;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class DockerJobBundleFactory
implements JobBundleFactory {
    private static final Logger LOG = LoggerFactory.getLogger(DockerJobBundleFactory.class);
    private static final int MAC_PORT_START = 8100;
    private static final int MAC_PORT_END = 8200;
    private static final AtomicInteger MAC_PORT = new AtomicInteger(8100);
    public static final AtomicReference<JobBundleFactoryFactory> FACTORY = new AtomicReference<1>(new JobBundleFactoryFactory(){

        @Override
        public DockerJobBundleFactory create(JobInfo jobInfo) throws Exception {
            return new DockerJobBundleFactory(jobInfo);
        }
    });
    private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
    private final IdGenerator stageIdGenerator;
    private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private final GrpcFnServer<GrpcLoggingService> loggingServer;
    private final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
    private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
    private final LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient> environmentCache;

    public static DockerJobBundleFactory create(JobInfo jobInfo) throws Exception {
        return FACTORY.get().create(jobInfo);
    }

    protected DockerJobBundleFactory(JobInfo jobInfo) throws Exception {
        ServerFactory serverFactory = this.getServerFactory();
        IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
        MapControlClientPool clientPool = MapControlClientPool.create();
        GrpcFnServer<FnApiControlClientPoolService> controlServer = GrpcFnServer.allocatePortAndCreateFor(FnApiControlClientPoolService.offeringClientsToPool(clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), serverFactory);
        GrpcFnServer<GrpcLoggingService> loggingServer = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
        GrpcFnServer<ArtifactRetrievalService> retrievalServer = GrpcFnServer.allocatePortAndCreateFor(BeamFileSystemArtifactRetrievalService.create(), serverFactory);
        GrpcFnServer<StaticGrpcProvisionService> provisioningServer = GrpcFnServer.allocatePortAndCreateFor(StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory);
        EnvironmentFactory environmentFactory = this.getEnvironmentFactory(controlServer, loggingServer, retrievalServer, provisioningServer, clientPool.getSource(), IdGenerators.incrementingLongs());
        this.stageIdGenerator = stageIdGenerator;
        this.controlServer = controlServer;
        this.loggingServer = loggingServer;
        this.retrievalServer = retrievalServer;
        this.provisioningServer = provisioningServer;
        this.environmentCache = this.createEnvironmentCache(environmentFactory, serverFactory);
    }

    @VisibleForTesting
    DockerJobBundleFactory(EnvironmentFactory environmentFactory, ServerFactory serverFactory, IdGenerator stageIdGenerator, GrpcFnServer<FnApiControlClientPoolService> controlServer, GrpcFnServer<GrpcLoggingService> loggingServer, GrpcFnServer<ArtifactRetrievalService> retrievalServer, GrpcFnServer<StaticGrpcProvisionService> provisioningServer) {
        this.stageIdGenerator = stageIdGenerator;
        this.controlServer = controlServer;
        this.loggingServer = loggingServer;
        this.retrievalServer = retrievalServer;
        this.provisioningServer = provisioningServer;
        this.environmentCache = this.createEnvironmentCache(environmentFactory, serverFactory);
    }

    private LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient> createEnvironmentCache(final EnvironmentFactory environmentFactory, final ServerFactory serverFactory) {
        return CacheBuilder.newBuilder().removalListener(notification -> {
            LOG.debug("Cleaning up for environment {}", (Object)((RunnerApi.Environment)notification.getKey()).getUrl());
            try {
                ((WrappedSdkHarnessClient)notification.getValue()).close();
            }
            catch (Exception e) {
                LOG.warn(String.format("Error cleaning up environment %s", notification.getKey()), (Throwable)e);
            }
        }).build(new CacheLoader<RunnerApi.Environment, WrappedSdkHarnessClient>(){

            @Override
            public WrappedSdkHarnessClient load(RunnerApi.Environment environment) throws Exception {
                RemoteEnvironment remoteEnvironment = environmentFactory.createEnvironment(environment);
                return WrappedSdkHarnessClient.wrapping(remoteEnvironment, serverFactory);
            }
        });
    }

    @Override
    public StageBundleFactory forStage(ExecutableStage executableStage) {
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        WrappedSdkHarnessClient wrappedClient = this.environmentCache.getUnchecked(executableStage.getEnvironment());
        try {
            processBundleDescriptor = ProcessBundleDescriptors.fromExecutableStage(this.stageIdGenerator.getId(), executableStage, wrappedClient.getDataServer().getApiServiceDescriptor(), wrappedClient.getStateServer().getApiServiceDescriptor());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return SimpleStageBundleFactory.create(wrappedClient, processBundleDescriptor);
    }

    @Override
    public void close() throws Exception {
        this.environmentCache.invalidateAll();
        this.environmentCache.cleanUp();
        this.controlServer.close();
        this.loggingServer.close();
        this.retrievalServer.close();
        this.provisioningServer.close();
    }

    protected ServerFactory getServerFactory() {
        switch (DockerJobBundleFactory.getPlatform()) {
            case LINUX: {
                return ServerFactory.createDefault();
            }
            case MAC: {
                return ServerFactory.createWithUrlFactoryAndPortSupplier((host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString(), () -> MAC_PORT.getAndUpdate(val -> val == 8200 ? 8100 : val + 1));
            }
        }
        LOG.warn("Unknown Docker platform. Falling back to default server factory");
        return ServerFactory.createDefault();
    }

    private static Platform getPlatform() {
        String osName = System.getProperty("os.name").toLowerCase();
        if (osName.startsWith("mac") || "1".equals(System.getenv("DOCKER_MAC_CONTAINER"))) {
            return Platform.MAC;
        }
        if (osName.startsWith("linux")) {
            return Platform.LINUX;
        }
        return Platform.OTHER;
    }

    protected EnvironmentFactory getEnvironmentFactory(GrpcFnServer<FnApiControlClientPoolService> controlServiceServer, GrpcFnServer<GrpcLoggingService> loggingServiceServer, GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer, GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer, ControlClientPool.Source clientSource, IdGenerator idGenerator) {
        return DockerEnvironmentFactory.forServices(controlServiceServer, loggingServiceServer, retrievalServiceServer, provisioningServiceServer, clientSource, idGenerator);
    }

    private static enum Platform {
        MAC,
        LINUX,
        OTHER;

    }

    private static class WrappedSdkHarnessClient
    implements AutoCloseable {
        private final RemoteEnvironment environment;
        private final ExecutorService executor;
        private final GrpcFnServer<GrpcDataService> dataServer;
        private final GrpcFnServer<GrpcStateService> stateServer;
        private final SdkHarnessClient client;

        static WrappedSdkHarnessClient wrapping(RemoteEnvironment environment, ServerFactory serverFactory) throws Exception {
            ExecutorService executor = Executors.newCachedThreadPool();
            GrpcFnServer<GrpcDataService> dataServer = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(executor, OutboundObserverFactory.serverDirect()), serverFactory);
            GrpcFnServer<GrpcStateService> stateServer = GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);
            SdkHarnessClient client = SdkHarnessClient.usingFnApiClient(environment.getInstructionRequestHandler(), dataServer.getService());
            return new WrappedSdkHarnessClient(environment, executor, dataServer, stateServer, client);
        }

        private WrappedSdkHarnessClient(RemoteEnvironment environment, ExecutorService executor, GrpcFnServer<GrpcDataService> dataServer, GrpcFnServer<GrpcStateService> stateServer, SdkHarnessClient client) {
            this.executor = executor;
            this.environment = environment;
            this.dataServer = dataServer;
            this.stateServer = stateServer;
            this.client = client;
        }

        SdkHarnessClient getClient() {
            return this.client;
        }

        GrpcFnServer<GrpcStateService> getStateServer() {
            return this.stateServer;
        }

        GrpcFnServer<GrpcDataService> getDataServer() {
            return this.dataServer;
        }

        @Override
        public void close() throws Exception {
            try (GrpcFnServer<GrpcStateService> stateServerCloser = this.stateServer;
                 GrpcFnServer<GrpcDataService> dataServerCloser = this.dataServer;
                 RemoteEnvironment envCloser = this.environment;){
                AutoCloseable executorCloser = this.executor::shutdown;
                Throwable throwable = null;
                if (executorCloser != null) {
                    WrappedSdkHarnessClient.$closeResource(throwable, executorCloser);
                }
            }
        }
    }

    private static class SimpleStageBundleFactory
    implements StageBundleFactory {
        private final SdkHarnessClient.BundleProcessor processor;
        private final ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        private WrappedSdkHarnessClient wrappedClient;

        static SimpleStageBundleFactory create(WrappedSdkHarnessClient wrappedClient, ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor) {
            SdkHarnessClient.BundleProcessor processor = wrappedClient.getClient().getProcessor(processBundleDescriptor.getProcessBundleDescriptor(), processBundleDescriptor.getRemoteInputDestinations(), wrappedClient.getStateServer().getService());
            return new SimpleStageBundleFactory(processBundleDescriptor, processor, wrappedClient);
        }

        SimpleStageBundleFactory(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor, SdkHarnessClient.BundleProcessor processor, WrappedSdkHarnessClient wrappedClient) {
            this.processBundleDescriptor = processBundleDescriptor;
            this.processor = processor;
            this.wrappedClient = wrappedClient;
        }

        @Override
        public RemoteBundle getBundle(OutputReceiverFactory outputReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler) throws Exception {
            ImmutableMap.Builder<BeamFnApi.Target, RemoteOutputReceiver> outputReceivers = ImmutableMap.builder();
            for (Map.Entry<BeamFnApi.Target, Coder<WindowedValue<?>>> targetCoder : this.processBundleDescriptor.getOutputTargetCoders().entrySet()) {
                BeamFnApi.Target target = targetCoder.getKey();
                Coder<WindowedValue<?>> coder = targetCoder.getValue();
                String bundleOutputPCollection = (String)Iterables.getOnlyElement(this.processBundleDescriptor.getProcessBundleDescriptor().getTransformsOrThrow(target.getPrimitiveTransformReference()).getInputsMap().values());
                FnDataReceiver outputReceiver = outputReceiverFactory.create(bundleOutputPCollection);
                outputReceivers.put(target, RemoteOutputReceiver.of(coder, outputReceiver));
            }
            return this.processor.newBundle(outputReceivers.build(), stateRequestHandler, progressHandler);
        }

        @Override
        public void close() throws Exception {
            this.wrappedClient = null;
        }
    }

    public static interface JobBundleFactoryFactory {
        public DockerJobBundleFactory create(JobInfo var1) throws Exception;
    }
}

