package io.cellery.observability.k8s.client;

import io.cellery.observability.k8s.client.cells.Cell;
import io.cellery.observability.k8s.client.cells.CellList;
import io.cellery.observability.k8s.client.cells.DoneableCell;
import io.cellery.observability.k8s.client.cells.model.HTTP;
import io.cellery.observability.k8s.client.cells.model.ServicesTemplate;
import io.cellery.observability.k8s.client.cells.model.TCP;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinition;
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinitionList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.internal.KubernetesDeserializer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;

@Extension(name = "k8s-components", namespace = "source", description = "This is an event source which emits events upon changes to Cellery cells deployed asKubernetes resource", examples = {@Example(syntax = "@source(type='k8s-components', @map(type='keyvalue', fail.on.missing.attribute='false'))\ndefine stream K8sComponentEventSourceStream (cell string, component string, creationTimestamp long, lastKnownActiveTimestamp long, ingressTypes string, action string)", description = "This will listen for kubernetes cell events and emit events upon changes to the cells")})
/* loaded from: input_file:io/cellery/observability/k8s/client/ComponentsEventSource.class */
public class ComponentsEventSource extends Source {
    private static final Logger logger = Logger.getLogger(ComponentsEventSource.class.getName());
    private static final String CELL_CRD_GROUP = "mesh.cellery.io";
    private static final String CELL_CRD_NAME = "cells.mesh.cellery.io";
    private static final String INGRESS_TYPES = "ingressTypes";
    private static final String CELL_CRD_VERSION = "v1alpha1";
    private static final String DEFAULT_NAMESPACE = "default";
    private static final String CELL = "Cell";
    private Watch cellWatcher;
    private KubernetesClient k8sClient;
    private SourceEventListener sourceEventListener;

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
    }

    public Class[] getOutputEventClasses() {
        return new Class[0];
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        this.k8sClient = K8sClientHolder.getK8sClient();
        if (logger.isDebugEnabled()) {
            logger.debug("Retrieved API server client instance");
        }
        KubernetesDeserializer.registerCustomKind("mesh.cellery.io/v1alpha1", CELL, Cell.class);
        this.cellWatcher = (Watch) ((NonNamespaceOperation) this.k8sClient.customResources(getCellCRD(), Cell.class, CellList.class, DoneableCell.class).inNamespace("default")).watch(new Watcher<Cell>() { // from class: io.cellery.observability.k8s.client.ComponentsEventSource.1
            public void eventReceived(Watcher.Action action, Cell cell) {
                List<HTTP> http = cell.getSpec().getGatewayTemplate().getSpec().getHttp();
                List<TCP> tcp = cell.getSpec().getGatewayTemplate().getSpec().getTcp();
                boolean z = !StringUtils.isEmpty(cell.getSpec().getGatewayTemplate().getSpec().getHost());
                try {
                    HashMap hashMap = new HashMap();
                    hashMap.put(Constants.ATTRIBUTE_CELL, cell.getMetadata().getName());
                    hashMap.put(Constants.ATTRIBUTE_CREATION_TIMESTAMP, Long.valueOf(cell.getMetadata().getCreationTimestamp() == null ? -1L : new SimpleDateFormat(Constants.K8S_DATE_FORMAT, Locale.US).parse(cell.getMetadata().getCreationTimestamp()).getTime()));
                    hashMap.put(Constants.ATTRIBUTE_ACTION, action.toString());
                    hashMap.put(Constants.ATTRIBUTE_LAST_KNOWN_ACTIVE_TIMESTAMP, 0L);
                    ComponentsEventSource.this.addComponentsInfo(cell, http, tcp, hashMap, z);
                } catch (ParseException e) {
                    ComponentsEventSource.logger.error("Ignored cell change due to creation timestamp parse failure", e);
                }
            }

            public void onClose(KubernetesClientException kubernetesClientException) {
                if (kubernetesClientException != null) {
                    ComponentsEventSource.logger.error("Kubernetes cell watcher closed with error", kubernetesClientException);
                } else if (ComponentsEventSource.logger.isDebugEnabled()) {
                    ComponentsEventSource.logger.debug("Kubernetes cell watcher closed successfully");
                }
            }
        });
        if (logger.isDebugEnabled()) {
            logger.debug("Created cell watcher");
        }
    }

    public void disconnect() {
        if (this.cellWatcher != null) {
            this.cellWatcher.close();
            if (logger.isDebugEnabled()) {
                logger.debug("Closed cell watcher");
            }
        }
        if (this.k8sClient != null) {
            this.k8sClient.close();
            if (logger.isDebugEnabled()) {
                logger.debug("Closed API server client");
            }
        }
    }

    public void destroy() {
    }

    public void pause() {
    }

    public void resume() {
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }

    private void addHttpType(List<HTTP> list, String str, List<String> list2, boolean z) {
        if (list != null) {
            boolean anyMatch = list.stream().anyMatch(http -> {
                return http.getBackend().equals(str);
            });
            if (anyMatch && z) {
                list2.add(Constants.INGRESS_TYPE_WEB);
            } else if (anyMatch) {
                list2.add(Constants.INGRESS_TYPE_HTTP);
            }
        }
    }

    private void addTcpType(List<TCP> list, String str, List<String> list2) {
        if (list == null || !list.stream().anyMatch(tcp -> {
            return tcp.getBackendHost().equals(str);
        })) {
            return;
        }
        list2.add(Constants.INGRESS_TYPE_TCP);
    }

    private Set<String> getComponentsList(Cell cell) {
        HashSet hashSet = new HashSet();
        Iterator<ServicesTemplate> it = cell.getSpec().getServicesTemplates().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getMetadata().getName());
        }
        return hashSet;
    }

    private CustomResourceDefinition getCellCRD() {
        for (CustomResourceDefinition customResourceDefinition : ((CustomResourceDefinitionList) this.k8sClient.customResourceDefinitions().list()).getItems()) {
            ObjectMeta metadata = customResourceDefinition.getMetadata();
            if (metadata != null && CELL_CRD_NAME.equalsIgnoreCase(metadata.getName())) {
                return customResourceDefinition;
            }
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addComponentsInfo(Cell cell, List<HTTP> list, List<TCP> list2, Map<String, Object> map, boolean z) {
        for (String str : getComponentsList(cell)) {
            ArrayList arrayList = new ArrayList();
            addHttpType(list, str, arrayList, z);
            addTcpType(list2, str, arrayList);
            map.put(Constants.ATTRIBUTE_COMPONENT, str);
            map.put(INGRESS_TYPES, StringUtils.join(arrayList, ','));
            this.sourceEventListener.onEvent(map, new String[0]);
        }
    }
}
