/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.utils.CircularIterator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.connect.runtime.distributed.ConnectAssignor;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;

public class EagerAssignor
implements ConnectAssignor {
    private final Logger log;

    public EagerAssignor(LogContext logContext) {
        this.log = logContext.logger(EagerAssignor.class);
    }

    @Override
    public Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata, WorkerCoordinator coordinator) {
        this.log.debug("Performing task assignment");
        HashMap<String, ExtendedWorkerState> memberConfigs = new HashMap<String, ExtendedWorkerState>();
        for (JoinGroupResponseData.JoinGroupResponseMember member : allMemberMetadata) {
            memberConfigs.put(member.memberId(), IncrementalCooperativeConnectProtocol.deserializeMetadata(ByteBuffer.wrap(member.metadata())));
        }
        long maxOffset = this.findMaxMemberConfigOffset(memberConfigs, coordinator);
        Long leaderOffset = this.ensureLeaderConfig(maxOffset, coordinator);
        if (leaderOffset == null) {
            return this.fillAssignmentsAndSerialize(memberConfigs.keySet(), (short)1, leaderId, ((ExtendedWorkerState)memberConfigs.get(leaderId)).url(), maxOffset, new HashMap<String, Collection<String>>(), new HashMap<String, Collection<ConnectorTaskId>>());
        }
        return this.performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator);
    }

    private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
        if (coordinator.configSnapshot().offset() < maxOffset) {
            ClusterConfigState updatedSnapshot = coordinator.configFreshSnapshot();
            if (updatedSnapshot.offset() < maxOffset) {
                this.log.info("Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync.");
                return null;
            }
            coordinator.configSnapshot(updatedSnapshot);
            return updatedSnapshot.offset();
        }
        return maxOffset;
    }

    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ExtendedWorkerState> memberConfigs, WorkerCoordinator coordinator) {
        HashMap<String, Collection<String>> connectorAssignments = new HashMap<String, Collection<String>>();
        HashMap<String, Collection<ConnectorTaskId>> taskAssignments = new HashMap<String, Collection<ConnectorTaskId>>();
        List<String> connectorsSorted = EagerAssignor.sorted(coordinator.configSnapshot().connectors());
        CircularIterator<String> memberIt = new CircularIterator<String>(EagerAssignor.sorted(memberConfigs.keySet()));
        for (String connectorId : connectorsSorted) {
            String connectorAssignedTo = memberIt.next();
            this.log.trace("Assigning connector {} to {}", (Object)connectorId, (Object)connectorAssignedTo);
            Collection memberConnectors = connectorAssignments.computeIfAbsent(connectorAssignedTo, k -> new ArrayList());
            memberConnectors.add(connectorId);
        }
        for (String connectorId : connectorsSorted) {
            for (ConnectorTaskId taskId : EagerAssignor.sorted(coordinator.configSnapshot().tasks(connectorId))) {
                String taskAssignedTo = memberIt.next();
                this.log.trace("Assigning task {} to {}", (Object)taskId, (Object)taskAssignedTo);
                Collection memberTasks = taskAssignments.computeIfAbsent(taskAssignedTo, k -> new ArrayList());
                memberTasks.add(taskId);
            }
        }
        coordinator.leaderState(new WorkerCoordinator.LeaderState(memberConfigs, connectorAssignments, taskAssignments));
        return this.fillAssignmentsAndSerialize(memberConfigs.keySet(), (short)0, leaderId, memberConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments);
    }

    private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members, short error, String leaderId, String leaderUrl, long maxOffset, Map<String, Collection<String>> connectorAssignments, Map<String, Collection<ConnectorTaskId>> taskAssignments) {
        HashMap<String, ByteBuffer> groupAssignment = new HashMap<String, ByteBuffer>();
        for (String member : members) {
            Collection<ConnectorTaskId> tasks;
            Collection<String> connectors = connectorAssignments.get(member);
            if (connectors == null) {
                connectors = Collections.emptyList();
            }
            if ((tasks = taskAssignments.get(member)) == null) {
                tasks = Collections.emptyList();
            }
            ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(error, leaderId, leaderUrl, maxOffset, connectors, tasks);
            this.log.debug("Assignment: {} -> {}", (Object)member, (Object)assignment);
            groupAssignment.put(member, ConnectProtocol.serializeAssignment(assignment));
        }
        this.log.debug("Finished assignment");
        return groupAssignment;
    }

    private long findMaxMemberConfigOffset(Map<String, ExtendedWorkerState> memberConfigs, WorkerCoordinator coordinator) {
        Long maxOffset = null;
        for (Map.Entry<String, ExtendedWorkerState> stateEntry : memberConfigs.entrySet()) {
            long memberRootOffset = stateEntry.getValue().offset();
            if (maxOffset == null) {
                maxOffset = memberRootOffset;
                continue;
            }
            maxOffset = Math.max(maxOffset, memberRootOffset);
        }
        this.log.debug("Max config offset root: {}, local snapshot config offsets root: {}", maxOffset, (Object)coordinator.configSnapshot().offset());
        return maxOffset;
    }

    private static <T extends Comparable<T>> List<T> sorted(Collection<T> members) {
        ArrayList<T> res = new ArrayList<T>(members);
        Collections.sort(res);
        return res;
    }
}

