/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.containers;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.ZeebeClientBuilder;
import io.camunda.zeebe.client.api.response.BrokerInfo;
import io.camunda.zeebe.client.api.response.PartitionInfo;
import io.camunda.zeebe.client.api.response.Topology;
import io.zeebe.containers.ZeebePort;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apiguardian.api.API;
import org.rnorth.ducttape.TimeoutException;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.ContainerLaunchException;
import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;

@API(status=API.Status.STABLE)
public class ZeebeTopologyWaitStrategy
extends AbstractWaitStrategy {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZeebeTopologyWaitStrategy.class);
    private int brokersCount;
    private int replicationFactor;
    private int partitionsCount;
    private int gatewayPort;
    private Supplier<ZeebeClientBuilder> clientBuilderProvider;

    public ZeebeTopologyWaitStrategy() {
        this(1);
    }

    public ZeebeTopologyWaitStrategy(int brokersCount) {
        this(brokersCount, 1);
    }

    public ZeebeTopologyWaitStrategy(int brokersCount, int replicationFactor) {
        this(brokersCount, replicationFactor, 1);
    }

    public ZeebeTopologyWaitStrategy(int brokersCount, int replicationFactor, int partitionsCount) {
        this(brokersCount, replicationFactor, partitionsCount, ZeebePort.GATEWAY.getPort());
    }

    public ZeebeTopologyWaitStrategy(int brokersCount, int replicationFactor, int partitionsCount, int gatewayPort) {
        this.brokersCount = brokersCount;
        this.replicationFactor = replicationFactor;
        this.partitionsCount = partitionsCount;
        this.gatewayPort = gatewayPort;
        this.clientBuilderProvider = () -> ZeebeClient.newClientBuilder().usePlaintext();
    }

    public ZeebeTopologyWaitStrategy forBrokersCount(int brokersCount) {
        this.brokersCount = brokersCount;
        return this;
    }

    public ZeebeTopologyWaitStrategy forReplicationFactor(int replicationFactor) {
        this.replicationFactor = replicationFactor;
        return this;
    }

    public ZeebeTopologyWaitStrategy forPartitionsCount(int partitionsCount) {
        this.partitionsCount = partitionsCount;
        return this;
    }

    public ZeebeTopologyWaitStrategy forGatewayPort(int gatewayPort) {
        this.gatewayPort = gatewayPort;
        return this;
    }

    @API(status=API.Status.EXPERIMENTAL)
    public ZeebeTopologyWaitStrategy forBuilder(Supplier<ZeebeClientBuilder> clientBuilderProvider) {
        this.clientBuilderProvider = clientBuilderProvider;
        return this;
    }

    protected void waitUntilReady() {
        TopologyHolder latestTopology = new TopologyHolder();
        try (ZeebeClient client = this.newZeebeClient(this.waitStrategyTarget);){
            String containerName = this.waitStrategyTarget.getContainerInfo().getName();
            LOGGER.info("{}: Waiting for {} for topology to have at least {} brokers, {} partitions with {} replicas, and each partition to have a leader", new Object[]{containerName, this.startupTimeout, this.brokersCount, this.partitionsCount, this.replicationFactor});
            Unreliables.retryUntilTrue((int)((int)this.startupTimeout.toMillis()), (TimeUnit)TimeUnit.MILLISECONDS, () -> (Boolean)this.getRateLimiter().getWhenReady(() -> {
                latestTopology.topology = this.getTopology(client);
                LOGGER.trace("{}: Topology: {}", (Object)containerName, (Object)latestTopology.topology);
                return this.isTopologyComplete(latestTopology.topology, containerName);
            }));
        }
        catch (TimeoutException e) {
            throw new ContainerLaunchException(String.format("Timed out waiting for gateway topology to be complete; latest known topology: %s", latestTopology));
        }
    }

    private ZeebeClient newZeebeClient(WaitStrategyTarget waitStrategyTarget) {
        String gatewayHost = waitStrategyTarget.getHost();
        int exposedGatewayPort = waitStrategyTarget.getMappedPort(this.gatewayPort);
        return this.clientBuilderProvider.get().gatewayAddress(gatewayHost + ":" + exposedGatewayPort).build();
    }

    private boolean isTopologyComplete(Topology topology, String containerName) {
        int actualBrokersCount = topology.getBrokers().size();
        if (actualBrokersCount < this.brokersCount) {
            return false;
        }
        Map<Integer, Partition> partitions = this.buildPartitionsMap(topology);
        if (partitions.size() < this.partitionsCount) {
            LOGGER.trace("{}: expected {} partitions, but found only {}", new Object[]{containerName, this.partitionsCount, partitions.size()});
            return false;
        }
        for (Partition partition : partitions.values()) {
            int leadersCount = partition.leaderIds.size();
            int followersCount = partition.followerIds.size();
            int expectedFollowersCount = this.replicationFactor - 1;
            if (leadersCount != 1) {
                LOGGER.trace("{}: expected exactly one leader, but got {} ({})", new Object[]{containerName, leadersCount, partition.leaderIds});
                return false;
            }
            if (followersCount >= expectedFollowersCount) continue;
            LOGGER.trace("{}: expected at least {} followers, but got {} ({})", new Object[]{containerName, expectedFollowersCount, followersCount, partition.followerIds});
            return false;
        }
        return true;
    }

    private Map<Integer, Partition> buildPartitionsMap(Topology topology) {
        HashMap<Integer, Partition> partitions = new HashMap<Integer, Partition>();
        for (BrokerInfo broker : topology.getBrokers()) {
            int nodeId = broker.getNodeId();
            for (PartitionInfo partitionInfo : broker.getPartitions()) {
                int partitionId = partitionInfo.getPartitionId();
                partitions.putIfAbsent(partitionId, new Partition());
                Partition partition = (Partition)partitions.get(partitionId);
                if (partitionInfo.isLeader()) {
                    partition.leaderIds.add(nodeId);
                    continue;
                }
                partition.followerIds.add(nodeId);
            }
        }
        return partitions;
    }

    private Topology getTopology(ZeebeClient client) {
        return (Topology)client.newTopologyRequest().send().join(this.startupTimeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    private static final class TopologyHolder {
        private Topology topology;

        private TopologyHolder() {
        }

        public String toString() {
            return this.topology.toString();
        }
    }

    private static final class Partition {
        private final Set<Integer> followerIds = new HashSet<Integer>();
        private final Set<Integer> leaderIds = new HashSet<Integer>();

        private Partition() {
        }
    }
}

