/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils;

@Internal
public class SplitAssignmentTracker<SplitT extends SourceSplit> {
    private final SortedMap<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentsByCheckpointId = new TreeMap<Long, Map<Integer, LinkedHashSet<SplitT>>>();
    private Map<Integer, LinkedHashSet<SplitT>> uncheckpointedAssignments = new HashMap<Integer, LinkedHashSet<SplitT>>();

    public void snapshotState(long checkpointId, SimpleVersionedSerializer<SplitT> splitSerializer, DataOutputStream out) throws Exception {
        this.assignmentsByCheckpointId.put(checkpointId, this.uncheckpointedAssignments);
        this.uncheckpointedAssignments = new HashMap<Integer, LinkedHashSet<SplitT>>();
        SourceCoordinatorSerdeUtils.writeAssignmentsByCheckpointId(this.assignmentsByCheckpointId, splitSerializer, out);
    }

    public void restoreState(SimpleVersionedSerializer<SplitT> splitSerializer, DataInputStream in) throws Exception {
        Map<Long, Map<Integer, LinkedHashSet<SplitT>>> deserializedAssignments = SourceCoordinatorSerdeUtils.readAssignmentsByCheckpointId(in, splitSerializer);
        this.assignmentsByCheckpointId.putAll(deserializedAssignments);
    }

    public void onCheckpointComplete(long checkpointId) {
        this.assignmentsByCheckpointId.entrySet().removeIf(entry -> (Long)entry.getKey() <= checkpointId);
    }

    public void recordSplitAssignment(SplitsAssignment<SplitT> splitsAssignment) {
        this.addSplitAssignment(splitsAssignment, this.uncheckpointedAssignments);
    }

    public List<SplitT> getAndRemoveUncheckpointedAssignment(int failedSubtaskId) {
        ArrayList splits = new ArrayList();
        this.assignmentsByCheckpointId.values().forEach(assignments -> this.removeFromAssignment(failedSubtaskId, (Map<Integer, LinkedHashSet<SplitT>>)assignments, splits));
        this.removeFromAssignment(failedSubtaskId, this.uncheckpointedAssignments, splits);
        return splits;
    }

    @VisibleForTesting
    SortedMap<Long, Map<Integer, LinkedHashSet<SplitT>>> assignmentsByCheckpointId() {
        return this.assignmentsByCheckpointId;
    }

    @VisibleForTesting
    Map<Integer, LinkedHashSet<SplitT>> assignmentsByCheckpointId(long checkpointId) {
        return (Map)this.assignmentsByCheckpointId.get(checkpointId);
    }

    @VisibleForTesting
    Map<Integer, LinkedHashSet<SplitT>> uncheckpointedAssignments() {
        return this.uncheckpointedAssignments;
    }

    private void removeFromAssignment(int subtaskId, Map<Integer, LinkedHashSet<SplitT>> assignments, List<SplitT> toPutBack) {
        Set splitForSubtask = assignments.remove(subtaskId);
        if (splitForSubtask != null) {
            toPutBack.addAll(splitForSubtask);
        }
    }

    private void addSplitAssignment(SplitsAssignment<SplitT> additionalAssignment, Map<Integer, LinkedHashSet<SplitT>> assignments) {
        additionalAssignment.assignment().forEach((id, splits) -> assignments.computeIfAbsent((Integer)id, ignored -> new LinkedHashSet()).addAll(splits));
    }
}

