/*
 * Decompiled with CFR 0.152.
 */
package io.kroxylicious.proxy.internal.net;

import io.kroxylicious.proxy.internal.net.Endpoint;
import io.kroxylicious.proxy.internal.net.EndpointBindingException;
import io.kroxylicious.proxy.internal.net.EndpointReconciler;
import io.kroxylicious.proxy.internal.net.EndpointResolutionException;
import io.kroxylicious.proxy.internal.net.NetworkBindRequest;
import io.kroxylicious.proxy.internal.net.NetworkBindingOperationProcessor;
import io.kroxylicious.proxy.internal.net.NetworkUnbindRequest;
import io.kroxylicious.proxy.internal.net.VirtualClusterBinding;
import io.kroxylicious.proxy.internal.net.VirtualClusterBindingResolver;
import io.kroxylicious.proxy.internal.net.VirtualClusterBootstrapBinding;
import io.kroxylicious.proxy.internal.net.VirtualClusterBrokerBinding;
import io.kroxylicious.proxy.model.VirtualCluster;
import io.kroxylicious.proxy.service.HostPort;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EndpointRegistry
implements EndpointReconciler,
VirtualClusterBindingResolver,
AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(EndpointRegistry.class);
    public static final String NO_CHANNEL_BINDINGS_MESSAGE = "No channel bindings found for";
    public static final String VIRTUAL_CLUSTER_CANNOT_BE_NULL_MESSAGE = "virtualCluster cannot be null";
    private final NetworkBindingOperationProcessor bindingOperationProcessor;
    protected static final AttributeKey<Map<RoutingKey, VirtualClusterBinding>> CHANNEL_BINDINGS = AttributeKey.newInstance((String)"channelBindings");
    private final Map<VirtualCluster, VirtualClusterRecord> registeredVirtualClusters = new ConcurrentHashMap<VirtualCluster, VirtualClusterRecord>();
    private final Map<Endpoint, ListeningChannelRecord> listeningChannels = new ConcurrentHashMap<Endpoint, ListeningChannelRecord>();

    public EndpointRegistry(NetworkBindingOperationProcessor bindingOperationProcessor) {
        this.bindingOperationProcessor = bindingOperationProcessor;
    }

    public CompletionStage<Endpoint> registerVirtualCluster(VirtualCluster virtualCluster) {
        Objects.requireNonNull(virtualCluster, VIRTUAL_CLUSTER_CANNOT_BE_NULL_MESSAGE);
        VirtualClusterRecord vcr = VirtualClusterRecord.create(new CompletableFuture<Endpoint>());
        VirtualClusterRecord current = this.registeredVirtualClusters.putIfAbsent(virtualCluster, vcr);
        if (current != null) {
            CompletionStage<Void> deregistration = current.deregistrationStage().get();
            if (deregistration != null) {
                return deregistration.thenCompose(u -> this.registerVirtualCluster(virtualCluster));
            }
            return current.registrationStage();
        }
        Endpoint key = Endpoint.createEndpoint(virtualCluster.getBindAddress(), virtualCluster.getClusterBootstrapAddress().port(), virtualCluster.isUseTls());
        HostPort upstreamBootstrap = virtualCluster.targetCluster().bootstrapServersList().get(0);
        CompletableFuture<Endpoint> bootstrapEndpointFuture = this.registerBinding(key, virtualCluster.getClusterBootstrapAddress().host(), new VirtualClusterBootstrapBinding(virtualCluster, upstreamBootstrap)).toCompletableFuture();
        vcr.reconciliationRecord().set(ReconciliationRecord.createEmptyReconcileRecord());
        CompletableFuture<Void> discoveryAddressesMapStage = EndpointRegistry.allOfStage(Optional.ofNullable(virtualCluster.discoveryAddressMap()).orElse(Map.of()).entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e -> {
            Integer nodeId = (Integer)e.getKey();
            HostPort bhp = (HostPort)e.getValue();
            return this.registerBinding(new Endpoint(virtualCluster.getBindAddress(), bhp.port(), virtualCluster.isUseTls()), bhp.host(), new VirtualClusterBrokerBinding(virtualCluster, upstreamBootstrap, nodeId, true));
        }));
        ((CompletableFuture)bootstrapEndpointFuture.thenCombine(discoveryAddressesMapStage, (bef, bps) -> bef)).whenComplete((u, t) -> {
            CompletableFuture<Endpoint> future = vcr.registrationStage.toCompletableFuture();
            if (t != null) {
                this.rollbackRelatedBindings(virtualCluster, (Throwable)t, future);
            } else {
                EndpointRegistry.handleSuccessfulBinding(bootstrapEndpointFuture, future);
            }
        });
        return vcr.registrationStage();
    }

    private static void handleSuccessfulBinding(CompletableFuture<Endpoint> bootstrapEndpointFuture, CompletableFuture<Endpoint> future) {
        try {
            future.complete(bootstrapEndpointFuture.get());
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            future.completeExceptionally(ie);
        }
        catch (ExecutionException ee) {
            future.completeExceptionally(ee.getCause());
        }
        catch (RuntimeException runtimeException) {
            future.completeExceptionally(runtimeException);
        }
    }

    private void rollbackRelatedBindings(VirtualCluster virtualCluster, Throwable originalFailure, CompletableFuture<Endpoint> future) {
        LOGGER.warn("Registration error", originalFailure);
        this.deregisterBinding(virtualCluster, vcb -> vcb.virtualCluster().equals(virtualCluster)).handle((result, throwable) -> {
            if (throwable != null) {
                LOGGER.warn("Secondary error occurred whilst handling a previous registration error: {}", (Object)originalFailure.getMessage(), throwable);
            }
            this.registeredVirtualClusters.remove(virtualCluster);
            future.completeExceptionally(originalFailure);
            return null;
        });
    }

    public CompletionStage<Void> deregisterVirtualCluster(VirtualCluster virtualCluster) {
        Objects.requireNonNull(virtualCluster, VIRTUAL_CLUSTER_CANNOT_BE_NULL_MESSAGE);
        VirtualClusterRecord vcr = this.registeredVirtualClusters.get(virtualCluster);
        if (vcr == null) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> deregisterFuture = new CompletableFuture<Void>();
        boolean updated = vcr.deregistrationStage().compareAndSet(null, deregisterFuture);
        if (!updated) {
            vcr.deregistrationStage().get().whenComplete((u, t) -> {
                if (t != null) {
                    deregisterFuture.completeExceptionally((Throwable)t);
                } else {
                    deregisterFuture.complete((Void)u);
                }
            });
            return deregisterFuture;
        }
        vcr.registrationStage().thenCompose(u -> this.deregisterBinding(virtualCluster, binding -> binding.virtualCluster().equals(virtualCluster)).handle((unused1, t) -> {
            this.registeredVirtualClusters.remove(virtualCluster);
            if (t != null) {
                deregisterFuture.completeExceptionally((Throwable)t);
            } else {
                deregisterFuture.complete(null);
            }
            return null;
        }));
        return deregisterFuture;
    }

    @Override
    public CompletionStage<Void> reconcile(VirtualCluster virtualCluster, Map<Integer, HostPort> upstreamNodes) {
        Objects.requireNonNull(virtualCluster, VIRTUAL_CLUSTER_CANNOT_BE_NULL_MESSAGE);
        Objects.requireNonNull(upstreamNodes, "upstreamNodes cannot be null");
        VirtualClusterRecord vcr = this.registeredVirtualClusters.get(virtualCluster);
        if (vcr == null) {
            return CompletableFuture.failedStage(new IllegalStateException("virtual cluster %s not registered or is being deregistered".formatted(virtualCluster)));
        }
        if (vcr.deregistrationStage().get() != null) {
            return CompletableFuture.completedStage(null);
        }
        ReconciliationRecord rec = vcr.reconciliationRecord().get();
        if (rec == null) {
            return CompletableFuture.failedStage(new IllegalStateException("virtual cluster %s in unexpected state".formatted(virtualCluster)));
        }
        if (rec.upstreamNodeMap().equals(upstreamNodes)) {
            return rec.reconciliationStage();
        }
        return rec.reconciliationStage().thenCompose(u -> {
            ReconciliationRecord cand = ReconciliationRecord.createReconcileRecord(upstreamNodes, new CompletableFuture<Void>());
            if (vcr.reconciliationRecord().compareAndSet(rec, cand)) {
                this.doReconcile(virtualCluster, upstreamNodes, cand.reconciliationStage().toCompletableFuture(), vcr);
                return cand.reconciliationStage();
            }
            ReconciliationRecord updated = vcr.reconciliationRecord().get();
            if (updated.upstreamNodeMap().equals(upstreamNodes)) {
                return updated.reconciliationStage();
            }
            return this.reconcile(virtualCluster, upstreamNodes);
        });
    }

    private void doReconcile(VirtualCluster virtualCluster, Map<Integer, HostPort> upstreamNodes, CompletableFuture<Void> future, VirtualClusterRecord vcr) {
        Optional<String> bindingAddress = virtualCluster.getBindAddress();
        Set discoveryBrokerIds = Optional.ofNullable(virtualCluster.discoveryAddressMap()).map(Map::keySet).orElse(Set.of());
        Set allBrokerIds = Stream.concat(discoveryBrokerIds.stream(), upstreamNodes.keySet().stream()).collect(Collectors.toUnmodifiableSet());
        Set<VirtualClusterBrokerBinding> creations = this.constructPossibleBindingsToCreate(virtualCluster, upstreamNodes);
        CompletableFuture<Void> deregs = EndpointRegistry.allOfStage(this.listeningChannels.values().stream().filter(lcr -> lcr.unbindingStage.get() == null).map(lcr -> lcr.bindingStage().thenCompose(acceptorChannel -> {
            Attribute bindings = acceptorChannel.attr(CHANNEL_BINDINGS);
            if (bindings == null || bindings.get() == null) {
                return CompletableFuture.completedStage(null);
            }
            Map bindingMap = (Map)bindings.get();
            return EndpointRegistry.allOfStage(bindingMap.values().stream().filter(vcb -> vcb.virtualCluster().equals(virtualCluster)).filter(VirtualClusterBrokerBinding.class::isInstance).map(VirtualClusterBrokerBinding.class::cast).peek(creations::remove).filter(vcbb -> !allBrokerIds.contains(vcbb.nodeId())).map(vcbb -> this.deregisterBinding(virtualCluster, vcbb::equals)));
        })));
        ((CompletableFuture)deregs.thenCompose(u1 -> EndpointRegistry.allOfStage(creations.stream().map(vcbb -> {
            HostPort brokerAddress = virtualCluster.getBrokerAddress(vcbb.nodeId());
            Endpoint endpoint = Endpoint.createEndpoint(bindingAddress, brokerAddress.port(), virtualCluster.isUseTls());
            return this.registerBinding(endpoint, brokerAddress.host(), (VirtualClusterBinding)vcbb);
        })))).whenComplete((u2, t) -> {
            if (t != null) {
                vcr.reconciliationRecord().set(ReconciliationRecord.createEmptyReconcileRecord());
                future.completeExceptionally((Throwable)t);
            } else {
                future.complete(null);
            }
        });
    }

    private Set<VirtualClusterBrokerBinding> constructPossibleBindingsToCreate(VirtualCluster virtualCluster, Map<Integer, HostPort> upstreamNodes) {
        HostPort upstreamBootstrap = virtualCluster.targetCluster().bootstrapServersList().get(0);
        Map discoveryBrokerIds = Optional.ofNullable(virtualCluster.discoveryAddressMap()).orElse(Map.of());
        ConcurrentHashMap.KeySetView creations = upstreamNodes.entrySet().stream().map(e -> new VirtualClusterBrokerBinding(virtualCluster, (HostPort)e.getValue(), (Integer)e.getKey(), false)).collect(Collectors.toCollection(ConcurrentHashMap::newKeySet));
        creations.addAll(discoveryBrokerIds.keySet().stream().filter(Predicate.not(upstreamNodes::containsKey)).map(nodeId -> new VirtualClusterBrokerBinding(virtualCluster, upstreamBootstrap, (int)nodeId, true)).toList());
        return creations;
    }

    boolean isRegistered(VirtualCluster virtualCluster) {
        return this.registeredVirtualClusters.containsKey(virtualCluster);
    }

    int listeningChannelCount() {
        return this.listeningChannels.size();
    }

    private CompletionStage<Endpoint> registerBinding(Endpoint key, String host, VirtualClusterBinding virtualClusterBinding) {
        Objects.requireNonNull(key, "key cannot be null");
        Objects.requireNonNull(virtualClusterBinding, "virtualClusterBinding cannot be null");
        VirtualCluster virtualCluster = virtualClusterBinding.virtualCluster();
        ListeningChannelRecord lcr = this.listeningChannels.computeIfAbsent(key, k -> {
            CompletableFuture<Channel> future = new CompletableFuture<Channel>();
            ListeningChannelRecord r = ListeningChannelRecord.create(future.exceptionally(t -> {
                this.listeningChannels.remove(key);
                if (t instanceof RuntimeException) {
                    RuntimeException re = (RuntimeException)t;
                    throw re;
                }
                throw new RuntimeException((Throwable)t);
            }));
            this.bindingOperationProcessor.enqueueNetworkBindingEvent(new NetworkBindRequest(future, Endpoint.createEndpoint(key.bindingAddress(), key.port(), virtualCluster.isUseTls())));
            return r;
        });
        if (lcr.unbindingStage().get() != null) {
            return lcr.unbindingStage().get().thenCompose(u -> this.registerBinding(key, host, virtualClusterBinding));
        }
        return lcr.bindingStage().thenApply(acceptorChannel -> {
            ListeningChannelRecord listeningChannelRecord = lcr;
            // MONITORENTER : listeningChannelRecord
            Attribute bindings = acceptorChannel.attr(CHANNEL_BINDINGS);
            bindings.setIfAbsent(new ConcurrentHashMap());
            Map bindingMap = (Map)bindings.get();
            RoutingKey bindingKey = virtualCluster.requiresTls() ? RoutingKey.createBindingKey(host) : RoutingKey.NULL_ROUTING_KEY;
            VirtualClusterBinding existing = bindingMap.putIfAbsent(bindingKey, virtualClusterBinding);
            if (existing instanceof VirtualClusterBrokerBinding) {
                VirtualClusterBrokerBinding vcbb;
                VirtualClusterBrokerBinding existingVcbb = (VirtualClusterBrokerBinding)existing;
                if (virtualClusterBinding instanceof VirtualClusterBrokerBinding && existingVcbb.refersToSameVirtualClusterAndNode(vcbb = (VirtualClusterBrokerBinding)virtualClusterBinding)) {
                    bindingMap.put(bindingKey, virtualClusterBinding);
                    return key;
                }
            }
            if (existing != null) {
                throw new EndpointBindingException("Endpoint %s cannot be bound with key %s binding %s, that key is already bound to %s".formatted(key, bindingKey, virtualClusterBinding, existing));
            }
            // MONITOREXIT : listeningChannelRecord
            return key;
        });
    }

    private CompletionStage<Void> deregisterBinding(VirtualCluster virtualCluster, Predicate<VirtualClusterBinding> predicate) {
        Objects.requireNonNull(virtualCluster, VIRTUAL_CLUSTER_CANNOT_BE_NULL_MESSAGE);
        Objects.requireNonNull(predicate, "predicate cannot be null");
        Stream unbindStages = this.listeningChannels.entrySet().stream().map(e -> {
            Endpoint endpoint = (Endpoint)e.getKey();
            ListeningChannelRecord lcr = (ListeningChannelRecord)e.getValue();
            return lcr.bindingStage().thenCompose(acceptorChannel -> {
                ListeningChannelRecord listeningChannelRecord = lcr;
                synchronized (listeningChannelRecord) {
                    Map bindingMap = (Map)acceptorChannel.attr(CHANNEL_BINDINGS).get();
                    Set allEntries = bindingMap.entrySet();
                    Set toRemove = allEntries.stream().filter(be -> predicate.test((VirtualClusterBinding)be.getValue())).collect(Collectors.toSet());
                    if (allEntries.removeAll(toRemove) && bindingMap.isEmpty()) {
                        CompletableFuture<Void> unbindFuture = new CompletableFuture<Void>();
                        CompletionStage afterUnbind = unbindFuture.whenComplete((u, t) -> this.listeningChannels.remove(endpoint));
                        if (lcr.unbindingStage().compareAndSet(null, afterUnbind)) {
                            this.bindingOperationProcessor.enqueueNetworkBindingEvent(new NetworkUnbindRequest(virtualCluster.isUseTls(), (Channel)acceptorChannel, unbindFuture));
                            return afterUnbind;
                        }
                        return lcr.unbindingStage().get();
                    }
                    return CompletableFuture.completedStage(null);
                }
            });
        });
        return EndpointRegistry.allOfStage(unbindStages);
    }

    @Override
    public CompletionStage<VirtualClusterBinding> resolve(Endpoint endpoint, String sniHostname) {
        ListeningChannelRecord lcr = this.listeningChannels.get(endpoint);
        if (lcr == null || lcr.unbindingStage().get() != null) {
            return CompletableFuture.failedStage(this.buildEndpointResolutionException("Failed to find channel matching", endpoint, sniHostname));
        }
        return lcr.bindingStage().thenApply(acceptorChannel -> {
            Attribute bindings = acceptorChannel.attr(CHANNEL_BINDINGS);
            if (bindings == null || bindings.get() == null) {
                throw this.buildEndpointResolutionException(NO_CHANNEL_BINDINGS_MESSAGE, endpoint, sniHostname);
            }
            VirtualClusterBinding binding = ((Map)bindings.get()).getOrDefault(RoutingKey.createBindingKey(sniHostname), (VirtualClusterBinding)((Map)bindings.get()).get(RoutingKey.NULL_ROUTING_KEY));
            if (binding == null) {
                if (sniHostname != null) {
                    HashMap<VirtualClusterBootstrapBinding, Integer> bootstrapToBrokerId = this.findBootstrapBindings(endpoint, sniHostname, (Attribute<Map<RoutingKey, VirtualClusterBinding>>)bindings);
                    int size = bootstrapToBrokerId.size();
                    if (size > 1) {
                        throw new EndpointResolutionException("Failed to generate an unbound broker binding from SNI as it matches the broker address pattern of more than one virtual cluster", this.buildEndpointResolutionException(NO_CHANNEL_BINDINGS_MESSAGE, endpoint, sniHostname));
                    }
                    if (size == 1) {
                        return EndpointRegistry.buildBootstrapBinding(bootstrapToBrokerId);
                    }
                }
                throw this.buildEndpointResolutionException(NO_CHANNEL_BINDINGS_MESSAGE, endpoint, sniHostname);
            }
            return binding;
        });
    }

    private static VirtualClusterBrokerBinding buildBootstrapBinding(Map<VirtualClusterBootstrapBinding, Integer> bootstrapToBrokerId) {
        Map.Entry<VirtualClusterBootstrapBinding, Integer> e = bootstrapToBrokerId.entrySet().iterator().next();
        VirtualClusterBootstrapBinding bootstrapBinding = e.getKey();
        Integer nodeId = e.getValue();
        return new VirtualClusterBrokerBinding(bootstrapBinding.virtualCluster(), bootstrapBinding.upstreamTarget(), nodeId, true);
    }

    private HashMap<VirtualClusterBootstrapBinding, Integer> findBootstrapBindings(Endpoint endpoint, String sniHostname, Attribute<Map<RoutingKey, VirtualClusterBinding>> bindings) {
        Collection<VirtualClusterBinding> allBindingsForPort = ((Map)bindings.get()).values();
        HostPort brokerAddress = new HostPort(sniHostname, endpoint.port());
        List<VirtualClusterBootstrapBinding> allBootstrapBindings = this.getAllBootstrapBindings(allBindingsForPort);
        return allBootstrapBindings.stream().collect(HashMap::new, (m, b) -> {
            Integer nodeId = b.virtualCluster().getBrokerIdFromBrokerAddress(brokerAddress);
            if (nodeId != null) {
                m.put(b, nodeId);
            }
        }, HashMap::putAll);
    }

    private List<VirtualClusterBootstrapBinding> getAllBootstrapBindings(Collection<VirtualClusterBinding> allBindingsForPort) {
        return allBindingsForPort.stream().filter(VirtualClusterBootstrapBinding.class::isInstance).map(VirtualClusterBootstrapBinding.class::cast).toList();
    }

    private EndpointResolutionException buildEndpointResolutionException(String prefix, Endpoint endpoint, String sniHostname) {
        return new EndpointResolutionException("%s binding address: %s, port: %d, sniHostname: %s, tls: %b".formatted(prefix, endpoint.bindingAddress().orElse("<any>"), endpoint.port(), sniHostname == null ? "<none>" : sniHostname, endpoint.tls()));
    }

    public CompletionStage<Void> shutdown() {
        return EndpointRegistry.allOfStage(this.registeredVirtualClusters.keySet().stream().map(this::deregisterVirtualCluster));
    }

    @Override
    public void close() throws Exception {
        this.shutdown().toCompletableFuture().get();
    }

    private static <T> CompletableFuture<Void> allOfStage(Stream<CompletionStage<T>> stageStream) {
        return EndpointRegistry.allOfFutures(stageStream.map(CompletionStage::toCompletableFuture));
    }

    private static <T> CompletableFuture<Void> allOfFutures(Stream<CompletableFuture<T>> futureStream) {
        return CompletableFuture.allOf((CompletableFuture[])futureStream.toArray(CompletableFuture[]::new));
    }

    private record VirtualClusterRecord(CompletionStage<Endpoint> registrationStage, AtomicReference<ReconciliationRecord> reconciliationRecord, AtomicReference<CompletionStage<Void>> deregistrationStage) {
        private VirtualClusterRecord {
            Objects.requireNonNull(registrationStage);
            Objects.requireNonNull(reconciliationRecord);
            Objects.requireNonNull(deregistrationStage);
        }

        private static VirtualClusterRecord create(CompletionStage<Endpoint> stage) {
            return new VirtualClusterRecord(stage, new AtomicReference<ReconciliationRecord>(), new AtomicReference<CompletionStage<Void>>());
        }
    }

    private record ReconciliationRecord(Map<Integer, HostPort> upstreamNodeMap, CompletionStage<Void> reconciliationStage) {
        private ReconciliationRecord {
            Objects.requireNonNull(upstreamNodeMap);
            Objects.requireNonNull(reconciliationStage);
        }

        public static ReconciliationRecord createEmptyReconcileRecord() {
            return ReconciliationRecord.createReconcileRecord(Map.of(), CompletableFuture.completedStage(null));
        }

        private static ReconciliationRecord createReconcileRecord(Map<Integer, HostPort> upstreamNodeMap, CompletionStage<Void> future) {
            return new ReconciliationRecord(upstreamNodeMap, future);
        }
    }

    private record ListeningChannelRecord(CompletionStage<Channel> bindingStage, AtomicReference<CompletionStage<Void>> unbindingStage) {
        private ListeningChannelRecord {
            Objects.requireNonNull(bindingStage);
            Objects.requireNonNull(unbindingStage);
        }

        public static ListeningChannelRecord create(CompletionStage<Channel> stage) {
            return new ListeningChannelRecord(stage, new AtomicReference<CompletionStage<Void>>());
        }
    }

    static interface RoutingKey {
        public static final RoutingKey NULL_ROUTING_KEY = new NullRoutingKey();

        public static RoutingKey createBindingKey(String sniHostname) {
            if (sniHostname == null || sniHostname.isEmpty()) {
                return NULL_ROUTING_KEY;
            }
            return new SniRoutingKey(sniHostname);
        }
    }

    private record SniRoutingKey(String sniHostname) implements RoutingKey
    {
        private SniRoutingKey(String sniHostname) {
            Objects.requireNonNull(sniHostname);
            this.sniHostname = sniHostname.toLowerCase(Locale.ROOT);
        }
    }

    private static class NullRoutingKey
    implements RoutingKey {
        private NullRoutingKey() {
        }

        public String toString() {
            return "NullRoutingKey[]";
        }
    }
}

