package org.apache.flink.runtime.execution.librarycache;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkUserCodeClassLoader;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.class */
public class BlobLibraryCacheManager implements LibraryCacheManager {
    private static final Logger LOG = LoggerFactory.getLogger(BlobLibraryCacheManager.class);
    private static final ExecutionAttemptID JOB_ATTEMPT_ID = new ExecutionAttemptID(-1, -1);
    private final Object lockObject;
    private final Map<JobID, LibraryCacheEntry> cacheEntries;
    private final PermanentBlobService blobService;
    private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;
    private final String[] alwaysParentFirstPatterns;
    private final Consumer<Throwable> classLoadingExceptionHandler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager$LibraryCacheEntry.class */
    public static class LibraryCacheEntry {
        private final URLClassLoader classLoader;
        private final Set<ExecutionAttemptID> referenceHolders;
        private final Set<PermanentBlobKey> libraries;
        private final Set<String> classPaths;

        LibraryCacheEntry(Collection<PermanentBlobKey> collection, Collection<URL> collection2, URL[] urlArr, ExecutionAttemptID executionAttemptID, FlinkUserCodeClassLoaders.ResolveOrder resolveOrder, String[] strArr, Consumer<Throwable> consumer) {
            this.classLoader = FlinkUserCodeClassLoaders.create(resolveOrder, urlArr, FlinkUserCodeClassLoaders.class.getClassLoader(), strArr, consumer);
            this.classPaths = new HashSet(collection2.size());
            Iterator<URL> it = collection2.iterator();
            while (it.hasNext()) {
                this.classPaths.add(it.next().toString());
            }
            this.libraries = new HashSet(collection);
            this.referenceHolders = new HashSet();
            this.referenceHolders.add(executionAttemptID);
        }

        public ClassLoader getClassLoader() {
            return this.classLoader;
        }

        public Set<PermanentBlobKey> getLibraries() {
            return this.libraries;
        }

        public void register(ExecutionAttemptID executionAttemptID, Collection<PermanentBlobKey> collection, Collection<URL> collection2) {
            if (this.libraries.size() != collection.size() || !new HashSet(collection).containsAll(this.libraries)) {
                throw new IllegalStateException("The library registration references a different set of library BLOBs than previous registrations for this job:\nold:" + this.libraries.toString() + "\nnew:" + collection.toString());
            }
            if (this.classPaths.size() != collection2.size() || !((Set) collection2.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.toSet())).containsAll(this.classPaths)) {
                throw new IllegalStateException("The library registration references a different set of library BLOBs than previous registrations for this job:\nold:" + this.classPaths.toString() + "\nnew:" + collection2.toString());
            }
            this.referenceHolders.add(executionAttemptID);
        }

        public boolean unregister(ExecutionAttemptID executionAttemptID) {
            this.referenceHolders.remove(executionAttemptID);
            return this.referenceHolders.isEmpty();
        }

        int getNumberOfReferenceHolders() {
            return this.referenceHolders.size();
        }

        void releaseClassLoader() {
            try {
                this.classLoader.close();
            } catch (IOException e) {
                BlobLibraryCacheManager.LOG.warn("Failed to release user code class loader for " + Arrays.toString(this.libraries.toArray()));
            }
        }
    }

    public BlobLibraryCacheManager(PermanentBlobService permanentBlobService, FlinkUserCodeClassLoaders.ResolveOrder resolveOrder, String[] strArr, @Nullable FatalErrorHandler fatalErrorHandler) {
        this.lockObject = new Object();
        this.cacheEntries = new HashMap();
        this.blobService = (PermanentBlobService) Preconditions.checkNotNull(permanentBlobService);
        this.classLoaderResolveOrder = (FlinkUserCodeClassLoaders.ResolveOrder) Preconditions.checkNotNull(resolveOrder);
        this.alwaysParentFirstPatterns = strArr;
        this.classLoadingExceptionHandler = createClassLoadingExceptionHandler(fatalErrorHandler);
    }

    @VisibleForTesting
    public BlobLibraryCacheManager(PermanentBlobService permanentBlobService, FlinkUserCodeClassLoaders.ResolveOrder resolveOrder, String[] strArr) {
        this(permanentBlobService, resolveOrder, strArr, null);
    }

    @Override // org.apache.flink.runtime.execution.librarycache.LibraryCacheManager
    public void registerJob(JobID jobID, Collection<PermanentBlobKey> collection, Collection<URL> collection2) throws IOException {
        registerTask(jobID, JOB_ATTEMPT_ID, collection, collection2);
    }

    @Override // org.apache.flink.runtime.execution.librarycache.LibraryCacheManager
    public void registerTask(JobID jobID, ExecutionAttemptID executionAttemptID, @Nullable Collection<PermanentBlobKey> collection, @Nullable Collection<URL> collection2) throws IOException {
        Preconditions.checkNotNull(jobID, "The JobId must not be null.");
        Preconditions.checkNotNull(executionAttemptID, "The task execution id must not be null.");
        if (collection == null) {
            collection = Collections.emptySet();
        }
        if (collection2 == null) {
            collection2 = Collections.emptySet();
        }
        synchronized (this.lockObject) {
            LibraryCacheEntry libraryCacheEntry = this.cacheEntries.get(jobID);
            if (libraryCacheEntry == null) {
                URL[] urlArr = new URL[collection.size() + collection2.size()];
                int i = 0;
                try {
                    Iterator<PermanentBlobKey> it = collection.iterator();
                    while (it.hasNext()) {
                        urlArr[i] = this.blobService.getFile(jobID, it.next()).toURI().toURL();
                        i++;
                    }
                    Iterator<URL> it2 = collection2.iterator();
                    while (it2.hasNext()) {
                        urlArr[i] = it2.next();
                        i++;
                    }
                    this.cacheEntries.put(jobID, new LibraryCacheEntry(collection, collection2, urlArr, executionAttemptID, this.classLoaderResolveOrder, this.alwaysParentFirstPatterns, this.classLoadingExceptionHandler));
                } catch (Throwable th) {
                    ExceptionUtils.tryRethrowIOException(th);
                    throw new IOException("Library cache could not register the user code libraries.", th);
                }
            } else {
                libraryCacheEntry.register(executionAttemptID, collection, collection2);
            }
        }
    }

    @Override // org.apache.flink.runtime.execution.librarycache.LibraryCacheManager
    public void unregisterJob(JobID jobID) {
        unregisterTask(jobID, JOB_ATTEMPT_ID);
    }

    @Override // org.apache.flink.runtime.execution.librarycache.LibraryCacheManager
    public void unregisterTask(JobID jobID, ExecutionAttemptID executionAttemptID) {
        Preconditions.checkNotNull(jobID, "The JobId must not be null.");
        Preconditions.checkNotNull(executionAttemptID, "The task execution id must not be null.");
        synchronized (this.lockObject) {
            LibraryCacheEntry libraryCacheEntry = this.cacheEntries.get(jobID);
            if (libraryCacheEntry != null && libraryCacheEntry.unregister(executionAttemptID)) {
                this.cacheEntries.remove(jobID);
                libraryCacheEntry.releaseClassLoader();
            }
        }
    }

    @Override // org.apache.flink.runtime.execution.librarycache.LibraryCacheManager
    public ClassLoader getClassLoader(JobID jobID) {
        ClassLoader classLoader;
        Preconditions.checkNotNull(jobID, "The JobId must not be null.");
        synchronized (this.lockObject) {
            LibraryCacheEntry libraryCacheEntry = this.cacheEntries.get(jobID);
            if (libraryCacheEntry == null) {
                throw new IllegalStateException("No libraries are registered for job " + jobID);
            }
            classLoader = libraryCacheEntry.getClassLoader();
        }
        return classLoader;
    }

    int getNumberOfReferenceHolders(JobID jobID) {
        int numberOfReferenceHolders;
        synchronized (this.lockObject) {
            LibraryCacheEntry libraryCacheEntry = this.cacheEntries.get(jobID);
            numberOfReferenceHolders = libraryCacheEntry == null ? 0 : libraryCacheEntry.getNumberOfReferenceHolders();
        }
        return numberOfReferenceHolders;
    }

    int getNumberOfManagedJobs() {
        return this.cacheEntries.size();
    }

    @Override // org.apache.flink.runtime.execution.librarycache.LibraryCacheManager
    public void shutdown() {
        synchronized (this.lockObject) {
            Iterator<LibraryCacheEntry> it = this.cacheEntries.values().iterator();
            while (it.hasNext()) {
                it.next().releaseClassLoader();
            }
        }
    }

    @Override // org.apache.flink.runtime.execution.librarycache.LibraryCacheManager
    public boolean hasClassLoader(@Nonnull JobID jobID) {
        boolean containsKey;
        synchronized (this.lockObject) {
            containsKey = this.cacheEntries.containsKey(jobID);
        }
        return containsKey;
    }

    private static Consumer<Throwable> createClassLoadingExceptionHandler(@Nullable FatalErrorHandler fatalErrorHandler) {
        return fatalErrorHandler != null ? th -> {
            if (ExceptionUtils.isMetaspaceOutOfMemoryError(th)) {
                fatalErrorHandler.onFatalError(th);
            }
        } : FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
    }
}
