package org.apache.kylin.rest.service;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.stream.coordinator.StreamMetadataStore;
import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
import org.apache.kylin.stream.coordinator.client.CoordinatorClientFactory;
import org.apache.kylin.stream.core.client.HttpReceiverAdminClient;
import org.apache.kylin.stream.core.client.ReceiverAdminClient;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
import org.apache.kylin.stream.core.model.stats.ClusterState;
import org.apache.kylin.stream.core.model.stats.ConsumerStats;
import org.apache.kylin.stream.core.model.stats.CubeRealTimeState;
import org.apache.kylin.stream.core.model.stats.PartitionConsumeStats;
import org.apache.kylin.stream.core.model.stats.ReceiverCubeRealTimeState;
import org.apache.kylin.stream.core.model.stats.ReceiverCubeStats;
import org.apache.kylin.stream.core.model.stats.ReceiverState;
import org.apache.kylin.stream.core.model.stats.ReceiverStats;
import org.apache.kylin.stream.core.model.stats.ReplicaSetState;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.source.StreamingSourceConfig;
import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
import org.apache.kylin.stream.core.source.StreamingSourceFactory;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;

@Component("streamingServiceV2")
/* loaded from: input_file:org/apache/kylin/rest/service/StreamingV2Service.class */
public class StreamingV2Service extends BasicService {
    private static final Logger logger = LoggerFactory.getLogger(StreamingV2Service.class);
    private Cache<String, ClusterState> clusterStateCache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();
    private ExecutorService clusterStateExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new NamedThreadFactory("fetch_receiver_state"));
    private StreamMetadataStore streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
    private ReceiverAdminClient receiverAdminClient = new HttpReceiverAdminClient();

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [java.util.List] */
    public List<StreamingSourceConfig> listAllStreamingConfigs(String str) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        if (StringUtils.isEmpty(str)) {
            newArrayList = getStreamingManagerV2().listAllStreaming();
        } else {
            StreamingSourceConfig config = getStreamingManagerV2().getConfig(str);
            if (config != null) {
                newArrayList.add(config);
            }
        }
        return newArrayList;
    }

    public List<StreamingSourceConfig> getStreamingConfigs(String str, Integer num, Integer num2) throws IOException {
        List<StreamingSourceConfig> listAllStreamingConfigs = listAllStreamingConfigs(str);
        return (num == null || num2 == null) ? listAllStreamingConfigs : listAllStreamingConfigs.size() - num2.intValue() < num.intValue() ? listAllStreamingConfigs.subList(num2.intValue(), listAllStreamingConfigs.size()) : listAllStreamingConfigs.subList(num2.intValue(), num2.intValue() + num.intValue());
    }

    @PreAuthorize("hasRole('ROLE_ADMIN') or hasPermission(#project, 'ADMINISTRATION')")
    public StreamingSourceConfig createStreamingConfig(StreamingSourceConfig streamingSourceConfig, ProjectInstance projectInstance) throws IOException {
        if (getStreamingManagerV2().getConfig(streamingSourceConfig.getName()) != null) {
            throw new InternalErrorException("The streamingSourceConfig named " + streamingSourceConfig.getName() + " already exists");
        }
        return getStreamingManagerV2().saveStreamingConfig(streamingSourceConfig);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public StreamingSourceConfig updateStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException {
        return getStreamingManagerV2().updateStreamingConfig(streamingSourceConfig);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void dropStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException {
        getStreamingManagerV2().removeStreamingConfig(streamingSourceConfig);
    }

    public String getParserTemplate(final int i, StreamingSourceConfig streamingSourceConfig) {
        return StreamingSourceFactory.getStreamingSource(new ISourceAware() { // from class: org.apache.kylin.rest.service.StreamingV2Service.1
            public int getSourceType() {
                return i;
            }

            public KylinConfig getConfig() {
                return getConfig();
            }
        }).getMessageTemplate(streamingSourceConfig);
    }

    @PreAuthorize("hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public List<CubeAssignment> getStreamingCubeAssignments(CubeInstance cubeInstance) {
        if (cubeInstance == null) {
            return this.streamMetadataStore.getAllCubeAssignments();
        }
        ArrayList newArrayList = Lists.newArrayList();
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(cubeInstance.getName());
        if (assignmentsByCube != null) {
            newArrayList.add(assignmentsByCube);
        }
        return newArrayList;
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public Map<Integer, Map<String, List<Partition>>> getStreamingReplicaSetAssignments(Integer num) {
        if (num == null) {
            return this.streamMetadataStore.getAllReplicaSetAssignments();
        }
        HashMap newHashMap = Maps.newHashMap();
        Map assignmentsByReplicaSet = this.streamMetadataStore.getAssignmentsByReplicaSet(num.intValue());
        if (assignmentsByReplicaSet != null) {
            newHashMap.put(num, assignmentsByReplicaSet);
        }
        return newHashMap;
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public Map<Integer, Map<String, List<Partition>>> reBalancePlan() {
        return getCoordinatorClient().reBalanceRecommend();
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void reBalance(Map<Integer, Map<String, List<Partition>>> map) {
        getCoordinatorClient().reBalance(map);
    }

    public List<String> getStreamingCubes() {
        return this.streamMetadataStore.getCubes();
    }

    public StreamingCubeConsumeState getStreamingCubeConsumeState(String str) {
        return this.streamMetadataStore.getStreamingCubeConsumeState(str);
    }

    @PreAuthorize("hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void assignCube(CubeInstance cubeInstance) {
        getCoordinatorClient().assignCube(cubeInstance.getName());
    }

    @PreAuthorize("hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void unAssignCube(CubeInstance cubeInstance) {
        getCoordinatorClient().unAssignCube(cubeInstance.getName());
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void reAssignCube(String str, CubeAssignment cubeAssignment) {
        validateAssignment(cubeAssignment);
        getCoordinatorClient().reAssignCube(str, cubeAssignment);
    }

    private void validateAssignment(CubeAssignment cubeAssignment) {
        Set<Integer> keySet = cubeAssignment.getAssignments().keySet();
        HashSet newHashSet = Sets.newHashSet(this.streamMetadataStore.getReplicaSetIDs());
        for (Integer num : keySet) {
            if (!newHashSet.contains(num)) {
                throw new IllegalArgumentException("the replica set id:" + num + " does not exist");
            }
        }
    }

    @PreAuthorize("hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void pauseConsumers(CubeInstance cubeInstance) {
        getCoordinatorClient().pauseConsumers(cubeInstance.getName());
    }

    @PreAuthorize("hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void resumeConsumers(CubeInstance cubeInstance) {
        getCoordinatorClient().resumeConsumers(cubeInstance.getName());
    }

    public void onSegmentRemoteStoreComplete(String str, Pair<Long, Long> pair, Node node) {
        logger.info("segment remote store complete signal received for cube:{}, segment:{}, try to find proper segment to build", str, pair);
        getCoordinatorClient().segmentRemoteStoreComplete(node, str, pair);
    }

    public StreamingSourceConfigManager getStreamingManagerV2() {
        return StreamingSourceConfigManager.getInstance(getConfig());
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void removeCubeAssignment() {
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public List<Node> getReceivers() {
        return this.streamMetadataStore.getReceivers();
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void removeReceiver(Node node) {
        for (ReplicaSet replicaSet : this.streamMetadataStore.getReplicaSets()) {
            Set nodes = replicaSet.getNodes();
            if (nodes != null && nodes.contains(node)) {
                throw new IllegalStateException("Before remove receiver, it must be firstly removed from replica set:" + replicaSet.getReplicaSetID());
            }
        }
        this.streamMetadataStore.removeReceiver(node);
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void createReplicaSet(ReplicaSet replicaSet) {
        getCoordinatorClient().createReplicaSet(replicaSet);
        this.clusterStateCache.invalidate("cluster_state");
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void removeReplicaSet(int i) {
        getCoordinatorClient().removeReplicaSet(i);
        this.clusterStateCache.invalidate("cluster_state");
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void addNodeToReplicaSet(Integer num, String str) {
        getCoordinatorClient().addNodeToReplicaSet(num, str);
        this.clusterStateCache.invalidate("cluster_state");
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public void removeNodeFromReplicaSet(Integer num, String str) {
        getCoordinatorClient().removeNodeFromReplicaSet(num, str);
        this.clusterStateCache.invalidate("cluster_state");
    }

    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
    public List<ReplicaSet> getReplicaSets() {
        return this.streamMetadataStore.getReplicaSets();
    }

    public ReceiverStats getReceiverStats(Node node) {
        try {
            return this.receiverAdminClient.getReceiverStats(node);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public ReceiverCubeStats getReceiverCubeStats(Node node, String str) {
        try {
            return this.receiverAdminClient.getReceiverCubeStats(node, str);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @PreAuthorize("hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public CubeRealTimeState getCubeRealTimeState(CubeInstance cubeInstance) {
        CubeRealTimeState cubeRealTimeState = new CubeRealTimeState();
        cubeRealTimeState.setCubeName(cubeInstance.getName());
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(cubeInstance.getName());
        HashMap newHashMap = Maps.newHashMap();
        for (Integer num : assignmentsByCube.getReplicaSetIDs()) {
            ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(num.intValue());
            HashMap newHashMap2 = Maps.newHashMap();
            for (Node node : replicaSet.getNodes()) {
                ReceiverCubeRealTimeState receiverCubeRealTimeState = new ReceiverCubeRealTimeState();
                try {
                    ReceiverCubeStats receiverCubeStats = this.receiverAdminClient.getReceiverCubeStats(node, cubeInstance.getName());
                    receiverCubeRealTimeState.setState(ReceiverState.State.HEALTHY);
                    receiverCubeRealTimeState.setReceiverCubeStats(receiverCubeStats);
                } catch (IOException e) {
                    logger.error("exception when get receiver cube stats", e);
                    if (isReceiverReachable(node)) {
                        receiverCubeRealTimeState.setState(ReceiverState.State.DOWN);
                    } else {
                        receiverCubeRealTimeState.setState(ReceiverState.State.UNREACHABLE);
                    }
                }
                newHashMap2.put(node, receiverCubeRealTimeState);
            }
            newHashMap.put(num, newHashMap2);
        }
        cubeRealTimeState.setReceiverCubeStateMap(newHashMap);
        return cubeRealTimeState;
    }

    public ClusterState getClusterState() {
        ClusterState clusterState = (ClusterState) this.clusterStateCache.getIfPresent("cluster_state");
        if (clusterState != null) {
            return clusterState;
        }
        List<ReplicaSet> replicaSets = this.streamMetadataStore.getReplicaSets();
        List<Node> receivers = this.streamMetadataStore.getReceivers();
        Map allReplicaSetAssignments = this.streamMetadataStore.getAllReplicaSetAssignments();
        HashMap newHashMap = Maps.newHashMap();
        for (final Node node : receivers) {
            newHashMap.put(node, this.clusterStateExecutor.submit(new Callable<ReceiverStats>() { // from class: org.apache.kylin.rest.service.StreamingV2Service.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public ReceiverStats call() throws Exception {
                    return StreamingV2Service.this.receiverAdminClient.getReceiverStats(node);
                }
            }));
        }
        ClusterState clusterState2 = new ClusterState();
        for (ReplicaSet replicaSet : replicaSets) {
            clusterState2.addReplicaSetState(calReplicaSetState(replicaSet, (Map) allReplicaSetAssignments.get(Integer.valueOf(replicaSet.getReplicaSetID())), newHashMap));
            receivers.removeAll(replicaSet.getNodes());
        }
        for (Node node2 : receivers) {
            clusterState2.addAvailableReveiverState(getReceiverStateFromStats(node2, newHashMap.get(node2)));
        }
        clusterState2.setLastUpdateTime(System.currentTimeMillis());
        this.clusterStateCache.put("cluster_state", clusterState2);
        return clusterState2;
    }

    private ReplicaSetState calReplicaSetState(ReplicaSet replicaSet, Map<String, List<Partition>> map, Map<Node, Future<ReceiverStats>> map2) {
        ReplicaSetState replicaSetState = new ReplicaSetState();
        replicaSetState.setRsID(replicaSet.getReplicaSetID());
        replicaSetState.setAssignment(map);
        Set<Node> nodes = replicaSet.getNodes();
        if (nodes == null || nodes.isEmpty()) {
            return replicaSetState;
        }
        Node leader = replicaSet.getLeader();
        replicaSetState.setLead(leader);
        HashMap newHashMap = Maps.newHashMap();
        for (Node node : nodes) {
            try {
                newHashMap.put(node, map2.get(node).get());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                replicaSetState.addReveiverState(getReceiverStateFromException(node, e2));
            }
        }
        HashMap newHashMap2 = Maps.newHashMap();
        Iterator it = newHashMap.values().iterator();
        while (it.hasNext()) {
            for (Map.Entry entry : ((ReceiverStats) it.next()).getCubeStatsMap().entrySet()) {
                String str = (String) entry.getKey();
                ReceiverCubeStats receiverCubeStats = (ReceiverCubeStats) entry.getValue();
                Long l = (Long) newHashMap2.get(str);
                if (l != null && l.longValue() < receiverCubeStats.getLatestEventTime()) {
                    newHashMap2.put(str, Long.valueOf(receiverCubeStats.getLatestEventTime()));
                } else if (l == null) {
                    newHashMap2.put(str, Long.valueOf(receiverCubeStats.getLatestEventTime()));
                }
            }
        }
        for (Map.Entry entry2 : newHashMap.entrySet()) {
            Node node2 = (Node) entry2.getKey();
            ReceiverStats receiverStats = (ReceiverStats) entry2.getValue();
            ReceiverState receiverState = new ReceiverState();
            receiverState.setReceiver(node2);
            receiverState.setState(ReceiverState.State.HEALTHY);
            if (!assignmentEqual(receiverStats.getAssignments(), map)) {
                receiverState.setState(ReceiverState.State.WARN);
                receiverState.addInfo("assignment is inconsistent");
            }
            if (receiverStats.isLead() && !node2.equals(leader)) {
                receiverState.setState(ReceiverState.State.WARN);
                receiverState.addInfo("lead state is inconsistent");
            }
            for (Map.Entry entry3 : receiverStats.getCubeStatsMap().entrySet()) {
                String str2 = (String) entry3.getKey();
                if (((Long) newHashMap2.get(str2)).longValue() - ((ReceiverCubeStats) entry3.getValue()).getLatestEventTime() >= 300000) {
                    receiverState.setState(ReceiverState.State.WARN);
                    receiverState.addInfo("cube:" + str2 + " consuming is lagged");
                }
            }
            receiverState.setRateInOneMin(calConsumeRate(node2, receiverStats));
            replicaSetState.addReveiverState(receiverState);
        }
        return replicaSetState;
    }

    private boolean assignmentEqual(Map<String, List<Partition>> map, Map<String, List<Partition>> map2) {
        if (emptyMap(map) && emptyMap(map2)) {
            return true;
        }
        if (map != null) {
            for (Map.Entry entry : map.entrySet()) {
                Collections.sort((List) entry.getValue());
                entry.setValue(entry.getValue());
            }
        }
        if (map2 != null) {
            for (Map.Entry entry2 : map2.entrySet()) {
                Collections.sort((List) entry2.getValue());
                entry2.setValue(entry2.getValue());
            }
        }
        return map != null && map.equals(map2);
    }

    private boolean emptyMap(Map map) {
        return map == null || map.isEmpty();
    }

    private ReceiverState getReceiverStateFromException(Node node, ExecutionException executionException) {
        ReceiverState receiverState = new ReceiverState();
        receiverState.setReceiver(node);
        if (isReceiverReachable(node)) {
            receiverState.setState(ReceiverState.State.DOWN);
        } else {
            receiverState.setState(ReceiverState.State.UNREACHABLE);
        }
        return receiverState;
    }

    private boolean isReceiverReachable(Node node) {
        try {
            return InetAddress.getByName(node.getHost()).isReachable(1000);
        } catch (Exception e) {
            logger.error("exception when try ping host:" + node.getHost(), e);
            return false;
        }
    }

    private ReceiverState getReceiverStateFromStats(Node node, Future<ReceiverStats> future) {
        ReceiverState receiverState = new ReceiverState();
        try {
            ReceiverStats receiverStats = future.get();
            receiverState.setReceiver(node);
            receiverState.setState(ReceiverState.State.HEALTHY);
            receiverState.setRateInOneMin(calConsumeRate(node, receiverStats));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            receiverState = getReceiverStateFromException(node, e2);
        }
        return receiverState;
    }

    private double calConsumeRate(Node node, ReceiverStats receiverStats) {
        double d = 0.0d;
        for (Map.Entry entry : receiverStats.getCubeStatsMap().entrySet()) {
            ConsumerStats consumerStats = ((ReceiverCubeStats) entry.getValue()).getConsumerStats();
            if (consumerStats == null) {
                logger.warn("no consumer stats exist for cube:{} in receiver:{}", entry.getKey(), node);
            } else {
                Iterator it = consumerStats.getPartitionConsumeStatsMap().values().iterator();
                while (it.hasNext()) {
                    d += ((PartitionConsumeStats) it.next()).getOneMinRate();
                }
            }
        }
        return d;
    }

    private synchronized CoordinatorClient getCoordinatorClient() {
        return CoordinatorClientFactory.createCoordinatorClient(this.streamMetadataStore);
    }
}
