package com.mpush.client.gateway.connection;

import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.common.net.HostAndPort;
import com.mpush.api.connection.Connection;
import com.mpush.api.event.ConnectionConnectEvent;
import com.mpush.api.service.Listener;
import com.mpush.api.spi.common.ServiceDiscoveryFactory;
import com.mpush.api.srd.ServiceDiscovery;
import com.mpush.api.srd.ServiceNode;
import com.mpush.client.gateway.GatewayClient;
import com.mpush.common.message.BaseMessage;
import com.mpush.tools.config.CC;
import com.mpush.tools.event.EventBus;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:com/mpush/client/gateway/connection/GatewayTCPConnectionFactory.class */
public class GatewayTCPConnectionFactory extends GatewayConnectionFactory {
    private final AttributeKey<String> attrKey = AttributeKey.valueOf("host_port");
    private final Map<String, List<Connection>> connections = Maps.newConcurrentMap();
    private GatewayClient client;

    protected void doStart(Listener listener) throws Throwable {
        EventBus.I.register(this);
        this.client = new GatewayClient();
        this.client.start().join();
        ServiceDiscovery create = ServiceDiscoveryFactory.create();
        create.subscribe("/cluster/gs", this);
        create.lookup("/cluster/gs").forEach(this::addConnection);
        listener.onSuccess(new Object[0]);
    }

    public void onServiceAdded(String str, ServiceNode serviceNode) {
        asyncAddConnection(serviceNode);
    }

    public void onServiceUpdated(String str, ServiceNode serviceNode) {
        removeClient(serviceNode);
        asyncAddConnection(serviceNode);
    }

    public void onServiceRemoved(String str, ServiceNode serviceNode) {
        removeClient(serviceNode);
        this.logger.warn("Gateway Server zkNode={} was removed.", serviceNode);
    }

    public void doStop(Listener listener) throws Throwable {
        this.connections.values().forEach(list -> {
            list.forEach((v0) -> {
                v0.close();
            });
        });
        if (this.client != null) {
            this.client.stop().join();
        }
        ServiceDiscoveryFactory.create().unsubscribe("/cluster/gs", this);
    }

    @Override // com.mpush.client.gateway.connection.GatewayConnectionFactory
    public Connection getConnection(String str) {
        List<Connection> list = this.connections.get(str);
        if (list == null || list.isEmpty()) {
            return null;
        }
        int size = list.size();
        Connection connection = size == 1 ? list.get(0) : list.get((int) ((Math.random() * size) % size));
        if (connection.isConnected()) {
            return connection;
        }
        reconnect(connection, str);
        return getConnection(str);
    }

    @Override // com.mpush.client.gateway.connection.GatewayConnectionFactory
    public <M extends BaseMessage> boolean send(String str, Function<Connection, M> function, Consumer<M> consumer) {
        Connection connection = getConnection(str);
        if (connection == null) {
            return false;
        }
        consumer.accept(function.apply(connection));
        return true;
    }

    @Override // com.mpush.client.gateway.connection.GatewayConnectionFactory
    public <M extends BaseMessage> boolean broadcast(Function<Connection, M> function, Consumer<M> consumer) {
        if (this.connections.isEmpty()) {
            return false;
        }
        this.connections.values().stream().filter(list -> {
            return list.size() > 0;
        }).forEach(list2 -> {
            consumer.accept(function.apply(list2.get(0)));
        });
        return true;
    }

    private void reconnect(Connection connection, String str) {
        HostAndPort fromString = HostAndPort.fromString(str);
        this.connections.get(str).remove(connection);
        connection.close();
        addConnection(fromString.getHost(), fromString.getPort(), false);
    }

    private void removeClient(ServiceNode serviceNode) {
        List<Connection> remove;
        if (serviceNode == null || (remove = this.connections.remove(getHostAndPort(serviceNode.getHost(), serviceNode.getPort()))) == null) {
            return;
        }
        remove.forEach((v0) -> {
            v0.close();
        });
    }

    private void asyncAddConnection(ServiceNode serviceNode) {
        for (int i = 0; i < CC.mp.net.gateway_client_num; i++) {
            addConnection(serviceNode.getHost(), serviceNode.getPort(), false);
        }
    }

    private void addConnection(ServiceNode serviceNode) {
        for (int i = 0; i < CC.mp.net.gateway_client_num; i++) {
            addConnection(serviceNode.getHost(), serviceNode.getPort(), true);
        }
    }

    private void addConnection(String str, int i, boolean z) {
        ChannelFuture connect = this.client.connect(str, i);
        connect.channel().attr(this.attrKey).set(getHostAndPort(str, i));
        connect.addListener(future -> {
            if (future.isSuccess()) {
                return;
            }
            this.logger.error("create gateway connection ex, host={}, port={}", new Object[]{str, Integer.valueOf(i), future.cause()});
        });
        if (z) {
            connect.awaitUninterruptibly();
        }
    }

    @Subscribe
    void on(ConnectionConnectEvent connectionConnectEvent) {
        Connection connection = connectionConnectEvent.connection;
        String str = (String) connection.getChannel().attr(this.attrKey).getAndSet((Object) null);
        if (str == null) {
            InetSocketAddress inetSocketAddress = (InetSocketAddress) connection.getChannel().remoteAddress();
            str = getHostAndPort(inetSocketAddress.getAddress().getHostAddress(), inetSocketAddress.getPort());
        }
        this.connections.computeIfAbsent(str, str2 -> {
            return new ArrayList(CC.mp.net.gateway_client_num);
        }).add(connection);
        this.logger.info("one gateway client connect success, hostAndPort={}, conn={}", str, connection);
    }

    private static String getHostAndPort(String str, int i) {
        return str + ":" + i;
    }
}
