package org.apache.flink.runtime.state;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/SharedStateRegistry.class */
public class SharedStateRegistry implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class);
    public static final SharedStateRegistryFactory DEFAULT_FACTORY = SharedStateRegistry::new;
    private final Map<SharedStateRegistryKey, SharedStateEntry> registeredStates;
    private boolean open;
    private final Executor asyncDisposalExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/SharedStateRegistry$AsyncDisposalRunnable.class */
    public static final class AsyncDisposalRunnable implements Runnable {
        private final StateObject toDispose;

        public AsyncDisposalRunnable(StateObject stateObject) {
            this.toDispose = (StateObject) Preconditions.checkNotNull(stateObject);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.toDispose.discardState();
            } catch (Exception e) {
                SharedStateRegistry.LOG.warn("A problem occurred during asynchronous disposal of a shared state object: {}", this.toDispose, e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/SharedStateRegistry$Result.class */
    public static class Result {
        private final StreamStateHandle reference;
        private final int referenceCount;

        private Result(SharedStateEntry sharedStateEntry) {
            this.reference = sharedStateEntry.getStateHandle();
            this.referenceCount = sharedStateEntry.getReferenceCount();
        }

        public Result(StreamStateHandle streamStateHandle, int i) {
            Preconditions.checkArgument(i >= 0);
            this.reference = streamStateHandle;
            this.referenceCount = i;
        }

        public StreamStateHandle getReference() {
            return this.reference;
        }

        public int getReferenceCount() {
            return this.referenceCount;
        }

        public String toString() {
            return "Result{reference=" + this.reference + ", referenceCount=" + this.referenceCount + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/SharedStateRegistry$SharedStateEntry.class */
    public static class SharedStateEntry {
        private final StreamStateHandle stateHandle;
        private int referenceCount = 1;

        SharedStateEntry(StreamStateHandle streamStateHandle) {
            this.stateHandle = streamStateHandle;
        }

        StreamStateHandle getStateHandle() {
            return this.stateHandle;
        }

        int getReferenceCount() {
            return this.referenceCount;
        }

        void increaseReferenceCount() {
            this.referenceCount++;
        }

        void decreaseReferenceCount() {
            this.referenceCount--;
        }

        public String toString() {
            return "SharedStateEntry{stateHandle=" + this.stateHandle + ", referenceCount=" + this.referenceCount + '}';
        }
    }

    public SharedStateRegistry() {
        this(Executors.directExecutor());
    }

    public SharedStateRegistry(Executor executor) {
        this.registeredStates = new HashMap();
        this.asyncDisposalExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.open = true;
    }

    public Result registerReference(SharedStateRegistryKey sharedStateRegistryKey, StreamStateHandle streamStateHandle) {
        SharedStateEntry sharedStateEntry;
        Preconditions.checkNotNull(streamStateHandle);
        StreamStateHandle streamStateHandle2 = null;
        synchronized (this.registeredStates) {
            Preconditions.checkState(this.open, "Attempt to register state to closed SharedStateRegistry.");
            sharedStateEntry = this.registeredStates.get(sharedStateRegistryKey);
            if (sharedStateEntry == null) {
                Preconditions.checkState(!isPlaceholder(streamStateHandle), "Attempt to reference unknown state: " + sharedStateRegistryKey);
                sharedStateEntry = new SharedStateEntry(streamStateHandle);
                this.registeredStates.put(sharedStateRegistryKey, sharedStateEntry);
            } else {
                if (!Objects.equals(streamStateHandle, sharedStateEntry.stateHandle)) {
                    streamStateHandle2 = streamStateHandle;
                    LOG.trace("Identified duplicate state registration under key {}. New state {} was determined to be an unnecessary copy of existing state {} and will be dropped.", new Object[]{sharedStateRegistryKey, streamStateHandle, sharedStateEntry.stateHandle});
                }
                sharedStateEntry.increaseReferenceCount();
            }
        }
        scheduleAsyncDelete(streamStateHandle2);
        LOG.trace("Registered shared state {} under key {}.", sharedStateEntry, sharedStateRegistryKey);
        return new Result(sharedStateEntry);
    }

    public Result unregisterReference(SharedStateRegistryKey sharedStateRegistryKey) {
        SharedStateEntry sharedStateEntry;
        StreamStateHandle streamStateHandle;
        Result result;
        Preconditions.checkNotNull(sharedStateRegistryKey);
        synchronized (this.registeredStates) {
            sharedStateEntry = this.registeredStates.get(sharedStateRegistryKey);
            Preconditions.checkState(sharedStateEntry != null, "Cannot unregister a state that is not registered: %s", new Object[]{sharedStateRegistryKey});
            sharedStateEntry.decreaseReferenceCount();
            if (sharedStateEntry.getReferenceCount() <= 0) {
                this.registeredStates.remove(sharedStateRegistryKey);
                streamStateHandle = sharedStateEntry.getStateHandle();
                result = new Result((StreamStateHandle) null, 0);
            } else {
                streamStateHandle = null;
                result = new Result(sharedStateEntry);
            }
        }
        LOG.trace("Unregistered shared state {} under key {}.", sharedStateEntry, sharedStateRegistryKey);
        scheduleAsyncDelete(streamStateHandle);
        return result;
    }

    public void registerAll(Iterable<? extends CompositeStateHandle> iterable) {
        if (iterable == null) {
            return;
        }
        synchronized (this.registeredStates) {
            Iterator<? extends CompositeStateHandle> it = iterable.iterator();
            while (it.hasNext()) {
                it.next().registerSharedStates(this);
            }
        }
    }

    public String toString() {
        String str;
        synchronized (this.registeredStates) {
            str = "SharedStateRegistry{registeredStates=" + this.registeredStates + '}';
        }
        return str;
    }

    private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
        if (streamStateHandle == null || isPlaceholder(streamStateHandle)) {
            return;
        }
        LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
        AsyncDisposalRunnable asyncDisposalRunnable = new AsyncDisposalRunnable(streamStateHandle);
        try {
            this.asyncDisposalExecutor.execute(asyncDisposalRunnable);
        } catch (RejectedExecutionException e) {
            asyncDisposalRunnable.run();
        }
    }

    private boolean isPlaceholder(StreamStateHandle streamStateHandle) {
        return streamStateHandle instanceof PlaceholderStreamStateHandle;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        synchronized (this.registeredStates) {
            this.open = false;
        }
    }
}
