package org.apache.zeppelin.cluster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.MembershipConfig;
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.impl.DefaultClusterMembershipService;
import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
import io.atomix.cluster.messaging.BroadcastService;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.primitive.PrimitiveState;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Serializer;
import java.io.File;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.cluster.event.ClusterEventListener;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.cluster.meta.ClusterMetaType;
import org.apache.zeppelin.cluster.protocol.RaftServerMessagingProtocol;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/cluster/ClusterManagerServer.class */
public class ClusterManagerServer extends ClusterManager {
    protected RaftServer raftServer;
    protected MessagingService messagingService;
    private List<ClusterEventListener> clusterIntpEventListeners;
    private List<ClusterEventListener> clusterNoteEventListeners;
    private List<ClusterEventListener> clusterAuthEventListeners;
    private List<ClusterEventListener> clusterIntpSettingEventListeners;
    private BiFunction<Address, byte[], byte[]> subscribeClusterIntpEvent;
    private BiFunction<Address, byte[], byte[]> subscribeClusterNoteEvent;
    private BiFunction<Address, byte[], byte[]> subscribeClusterAuthEvent;
    private BiFunction<Address, byte[], byte[]> subscribeIntpSettingEvent;
    private static Logger LOGGER = LoggerFactory.getLogger(ClusterManagerServer.class);
    private static ClusterManagerServer instance = null;
    public static String CLUSTER_INTP_EVENT_TOPIC = "CLUSTER_INTP_EVENT_TOPIC";
    public static String CLUSTER_NOTE_EVENT_TOPIC = "CLUSTER_NOTE_EVENT_TOPIC";
    public static String CLUSTER_AUTH_EVENT_TOPIC = "CLUSTER_AUTH_EVENT_TOPIC";
    public static String CLUSTER_INTP_SETTING_EVENT_TOPIC = "CLUSTER_INTP_SETTING_EVENT_TOPIC";

    private ClusterManagerServer(ZeppelinConfiguration zeppelinConfiguration) {
        super(zeppelinConfiguration);
        this.raftServer = null;
        this.messagingService = null;
        this.clusterIntpEventListeners = new ArrayList();
        this.clusterNoteEventListeners = new ArrayList();
        this.clusterAuthEventListeners = new ArrayList();
        this.clusterIntpSettingEventListeners = new ArrayList();
        this.subscribeClusterIntpEvent = (address, bArr) -> {
            String str = new String(bArr);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("subscribeClusterIntpEvent() {}", str);
            }
            Iterator<ClusterEventListener> it = this.clusterIntpEventListeners.iterator();
            while (it.hasNext()) {
                it.next().onClusterEvent(str);
            }
            return null;
        };
        this.subscribeClusterNoteEvent = (address2, bArr2) -> {
            String str = new String(bArr2);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("subscribeClusterNoteEvent() {}", str);
            }
            Iterator<ClusterEventListener> it = this.clusterNoteEventListeners.iterator();
            while (it.hasNext()) {
                it.next().onClusterEvent(str);
            }
            return null;
        };
        this.subscribeClusterAuthEvent = (address3, bArr3) -> {
            String str = new String(bArr3);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("subscribeClusterAuthEvent() {}", str);
            }
            Iterator<ClusterEventListener> it = this.clusterAuthEventListeners.iterator();
            while (it.hasNext()) {
                it.next().onClusterEvent(str);
            }
            return null;
        };
        this.subscribeIntpSettingEvent = (address4, bArr4) -> {
            String str = new String(bArr4);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("subscribeIntpSettingEvent() {}", str);
            }
            Iterator<ClusterEventListener> it = this.clusterIntpSettingEventListeners.iterator();
            while (it.hasNext()) {
                it.next().onClusterEvent(str);
            }
            return null;
        };
    }

    public static ClusterManagerServer getInstance(ZeppelinConfiguration zeppelinConfiguration) {
        ClusterManagerServer clusterManagerServer;
        synchronized (ClusterManagerServer.class) {
            if (instance == null) {
                instance = new ClusterManagerServer(zeppelinConfiguration);
            }
            clusterManagerServer = instance;
        }
        return clusterManagerServer;
    }

    @Override // org.apache.zeppelin.cluster.ClusterManager
    public void start() {
        if (this.zConf.isClusterMode()) {
            initThread();
            String clusterNodeName = getClusterNodeName();
            this.clusterMonitor = new ClusterMonitor(this);
            this.clusterMonitor.start(ClusterMetaType.SERVER_META, clusterNodeName);
            super.start();
        }
    }

    @VisibleForTesting
    public void initTestCluster(String str, String str2, int i) {
        this.isTest = true;
        this.zeplServerHost = str2;
        this.raftServerPort = i;
        this.clusterNodes.clear();
        this.raftAddressMap.clear();
        this.clusterMemberIds.clear();
        for (String str3 : str.split(",")) {
            String[] split = str3.split(":");
            String str4 = split[0];
            int intValue = Integer.valueOf(split[1]).intValue();
            String str5 = str4 + ":" + intValue;
            Address from = Address.from(str4, intValue);
            this.clusterNodes.add(Node.builder().withId(str5).withAddress(from).build());
            this.raftAddressMap.put(MemberId.from(str5), from);
            this.clusterMemberIds.add(MemberId.from(str5));
        }
    }

    @Override // org.apache.zeppelin.cluster.ClusterManager
    public boolean raftInitialized() {
        return (null == this.raftServer || !this.raftServer.isRunning() || null == this.raftClient || null == this.raftSessionClient || this.raftSessionClient.getState() != PrimitiveState.CONNECTED) ? false : true;
    }

    @Override // org.apache.zeppelin.cluster.ClusterManager
    public boolean isClusterLeader() {
        return null != this.raftServer && this.raftServer.isRunning() && this.raftServer.isLeader();
    }

    private void initThread() {
        new Thread(new Runnable() { // from class: org.apache.zeppelin.cluster.ClusterManagerServer.1
            @Override // java.lang.Runnable
            public void run() {
                ClusterManagerServer.LOGGER.info("RaftServer run() >>>");
                Address from = Address.from(ClusterManagerServer.this.zeplServerHost, ClusterManagerServer.this.raftServerPort);
                Member build = Member.builder(MemberId.from(ClusterManagerServer.this.zeplServerHost + ":" + ClusterManagerServer.this.raftServerPort)).withAddress(from).build();
                ClusterManagerServer.this.messagingService = (MessagingService) NettyMessagingService.builder().withAddress(from).build().start().join();
                MessagingService messagingService = ClusterManagerServer.this.messagingService;
                Serializer serializer = ClusterManager.protocolSerializer;
                Map<MemberId, Address> map = ClusterManagerServer.this.raftAddressMap;
                map.getClass();
                RaftServerMessagingProtocol raftServerMessagingProtocol = new RaftServerMessagingProtocol(messagingService, serializer, (v1) -> {
                    return r4.get(v1);
                });
                BootstrapService bootstrapService = new BootstrapService() { // from class: org.apache.zeppelin.cluster.ClusterManagerServer.1.1
                    public MessagingService getMessagingService() {
                        return ClusterManagerServer.this.messagingService;
                    }

                    public BroadcastService getBroadcastService() {
                        return new BroadcastServiceAdapter();
                    }
                };
                DefaultClusterMembershipService defaultClusterMembershipService = new DefaultClusterMembershipService(build, new DefaultNodeDiscoveryService(bootstrapService, build, new BootstrapDiscoveryProvider(ClusterManagerServer.this.clusterNodes)), bootstrapService, new MembershipConfig());
                File createTempDir = Files.createTempDir();
                createTempDir.deleteOnExit();
                ClusterManagerServer.this.raftServer = (RaftServer) RaftServer.builder(build.id()).withMembershipService(defaultClusterMembershipService).withProtocol(raftServerMessagingProtocol).withStorage(RaftStorage.builder().withStorageLevel(StorageLevel.MEMORY).withDirectory(createTempDir).withSerializer(ClusterManager.storageSerializer).withMaxSegmentSize(1048576).build()).build();
                ClusterManagerServer.this.raftServer.bootstrap(ClusterManagerServer.this.clusterMemberIds);
                ClusterManagerServer.this.messagingService.registerHandler(ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, ClusterManagerServer.this.subscribeClusterIntpEvent, MoreExecutors.directExecutor());
                ClusterManagerServer.this.messagingService.registerHandler(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, ClusterManagerServer.this.subscribeClusterNoteEvent, MoreExecutors.directExecutor());
                ClusterManagerServer.this.messagingService.registerHandler(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, ClusterManagerServer.this.subscribeClusterAuthEvent, MoreExecutors.directExecutor());
                ClusterManagerServer.this.messagingService.registerHandler(ClusterManagerServer.CLUSTER_INTP_SETTING_EVENT_TOPIC, ClusterManagerServer.this.subscribeIntpSettingEvent, MoreExecutors.directExecutor());
                HashMap<String, Object> hashMap = new HashMap<>();
                String clusterNodeName = ClusterManagerServer.this.getClusterNodeName();
                hashMap.put(ClusterMeta.NODE_NAME, clusterNodeName);
                hashMap.put(ClusterMeta.SERVER_HOST, ClusterManagerServer.this.zeplServerHost);
                hashMap.put(ClusterMeta.SERVER_PORT, Integer.valueOf(ClusterManagerServer.this.raftServerPort));
                hashMap.put(ClusterMeta.SERVER_START_TIME, LocalDateTime.now());
                ClusterManagerServer.this.putClusterMeta(ClusterMetaType.SERVER_META, clusterNodeName, hashMap);
                ClusterManagerServer.LOGGER.info("RaftServer run() <<<");
            }
        }).start();
    }

    @Override // org.apache.zeppelin.cluster.ClusterManager
    public void shutdown() {
        if (this.zConf.isClusterMode()) {
            try {
                deleteClusterMeta(ClusterMetaType.SERVER_META, getClusterNodeName());
                Thread.sleep(300L);
                if (this.clusterMonitor != null) {
                    this.clusterMonitor.shutdown();
                }
                Thread.sleep(300L);
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage(), e);
            }
            if (null != this.raftServer && this.raftServer.isRunning()) {
                try {
                    this.raftServer.shutdown().get(3L, TimeUnit.SECONDS);
                } catch (InterruptedException e2) {
                    LOGGER.error(e2.getMessage(), e2);
                } catch (ExecutionException e3) {
                    LOGGER.error(e3.getMessage(), e3);
                } catch (TimeoutException e4) {
                    LOGGER.error(e4.getMessage(), e4);
                }
            }
            super.shutdown();
            instance = null;
        }
    }

    public HashMap<String, Object> getIdleNodeMeta() {
        HashMap<String, Object> hashMap = null;
        long j = 0;
        Iterator<Map.Entry<String, HashMap<String, Object>>> it = getClusterMeta(ClusterMetaType.SERVER_META, "").entrySet().iterator();
        while (it.hasNext()) {
            HashMap<String, Object> value = it.next().getValue();
            String str = (String) value.get(ClusterMeta.STATUS);
            if (null != str && !StringUtils.isEmpty(str) && !str.equals(ClusterMeta.OFFLINE_STATUS)) {
                long longValue = ((Long) value.get(ClusterMeta.MEMORY_CAPACITY)).longValue() - ((Long) value.get(ClusterMeta.MEMORY_USED)).longValue();
                if (longValue > j) {
                    j = longValue;
                    hashMap = value;
                }
            }
        }
        return hashMap;
    }

    public void unicastClusterEvent(String str, int i, String str2, String str3) {
        LOGGER.info("send unicastClusterEvent host:{} port:{} topic:{} message:{}", new Object[]{str, Integer.valueOf(i), str2, str3});
        this.messagingService.sendAndReceive(Address.from(str, i), str2, str3.getBytes(), Duration.ofSeconds(2L)).whenComplete((bArr, th) -> {
            if (null == th) {
                LOGGER.error(th.getMessage(), th);
            }
        });
    }

    public void broadcastClusterEvent(String str, String str2) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("send broadcastClusterEvent message {}", str2);
        }
        for (Node node : this.clusterNodes) {
            if (!StringUtils.equals(node.address().host(), this.zeplServerHost) || node.address().port() != this.raftServerPort) {
                this.messagingService.sendAndReceive(node.address(), str, str2.getBytes(), Duration.ofSeconds(2L)).whenComplete((bArr, th) -> {
                    if (null == th) {
                        LOGGER.error(th.getMessage(), th);
                    } else {
                        LOGGER.info("broadcastClusterNoteEvent success! {}", str2);
                    }
                });
            }
        }
    }

    public void addClusterEventListeners(String str, ClusterEventListener clusterEventListener) {
        if (StringUtils.equals(str, CLUSTER_INTP_EVENT_TOPIC)) {
            this.clusterIntpEventListeners.add(clusterEventListener);
            return;
        }
        if (StringUtils.equals(str, CLUSTER_NOTE_EVENT_TOPIC)) {
            this.clusterNoteEventListeners.add(clusterEventListener);
            return;
        }
        if (StringUtils.equals(str, CLUSTER_AUTH_EVENT_TOPIC)) {
            this.clusterAuthEventListeners.add(clusterEventListener);
        } else if (StringUtils.equals(str, CLUSTER_INTP_SETTING_EVENT_TOPIC)) {
            this.clusterIntpSettingEventListeners.add(clusterEventListener);
        } else {
            LOGGER.error("Unknow cluster event topic : {}", str);
        }
    }
}
