/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.protocol;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.protocol.ProtocolHandlerDefinitions;
import org.apache.pulsar.broker.protocol.ProtocolHandlerMetadata;
import org.apache.pulsar.broker.protocol.ProtocolHandlerUtils;
import org.apache.pulsar.broker.protocol.ProtocolHandlerWithClassLoader;
import org.apache.pulsar.broker.service.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProtocolHandlers
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ProtocolHandlers.class);
    private final Map<String, ProtocolHandlerWithClassLoader> handlers;

    public static ProtocolHandlers load(ServiceConfiguration conf) throws IOException {
        ProtocolHandlerDefinitions definitions = ProtocolHandlerUtils.searchForHandlers(conf.getProtocolHandlerDirectory());
        ImmutableMap.Builder handlersBuilder = ImmutableMap.builder();
        conf.getMessagingProtocols().forEach(protocol -> {
            ProtocolHandlerWithClassLoader handler;
            ProtocolHandlerMetadata definition = definitions.handlers().get(protocol);
            if (null == definition) {
                throw new RuntimeException("No protocol handler is found for protocol `" + protocol + "`. Available protocols are : " + definitions.handlers());
            }
            try {
                handler = ProtocolHandlerUtils.load(definition);
            }
            catch (IOException e) {
                log.error("Failed to load the protocol handler for protocol `" + protocol + "`", (Throwable)e);
                throw new RuntimeException("Failed to load the protocol handler for protocol `" + protocol + "`");
            }
            if (!handler.accept((String)protocol)) {
                handler.close();
                log.error("Malformed protocol handler found for protocol `" + protocol + "`");
                throw new RuntimeException("Malformed protocol handler found for protocol `" + protocol + "`");
            }
            handlersBuilder.put(protocol, (Object)handler);
            log.info("Successfully loaded protocol handler for protocol `{}`", protocol);
        });
        return new ProtocolHandlers((Map<String, ProtocolHandlerWithClassLoader>)handlersBuilder.build());
    }

    ProtocolHandlers(Map<String, ProtocolHandlerWithClassLoader> handlers) {
        this.handlers = handlers;
    }

    public ProtocolHandler protocol(String protocol) {
        ProtocolHandlerWithClassLoader h = this.handlers.get(protocol);
        if (null == h) {
            return null;
        }
        return h.getHandler();
    }

    public void initialize(ServiceConfiguration conf) throws Exception {
        for (ProtocolHandler protocolHandler : this.handlers.values()) {
            protocolHandler.initialize(conf);
        }
    }

    public Map<String, String> getProtocolDataToAdvertise() {
        return this.handlers.entrySet().stream().collect(Collectors.toMap(e -> (String)e.getKey(), e -> ((ProtocolHandlerWithClassLoader)e.getValue()).getProtocolDataToAdvertise()));
    }

    public Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> newChannelInitializers() {
        HashMap channelInitializers = Maps.newHashMap();
        HashSet addresses = Sets.newHashSet();
        for (Map.Entry<String, ProtocolHandlerWithClassLoader> handler : this.handlers.entrySet()) {
            Map<InetSocketAddress, ChannelInitializer<SocketChannel>> initializers = handler.getValue().newChannelInitializers();
            initializers.forEach((address, initializer) -> {
                if (!addresses.add(address)) {
                    log.error("Protocol handler for `{}` attempts to use {} for its listening port. But it is already occupied by other message protocols.", handler.getKey(), address);
                    throw new RuntimeException("Protocol handler for `" + (String)handler.getKey() + "` attempts to use " + address + " for its listening port. But it is already occupied by other messaging protocols");
                }
                channelInitializers.put(handler.getKey(), initializers);
            });
        }
        return channelInitializers;
    }

    public void start(BrokerService service) {
        this.handlers.values().forEach(handler -> handler.start(service));
    }

    @Override
    public void close() {
        this.handlers.values().forEach(ProtocolHandler::close);
    }
}

