package io.cellery.observability.k8s.client;

import io.cellery.observability.k8s.client.Constants;
import io.cellery.observability.k8s.client.crds.cell.CellImpl;
import io.cellery.observability.k8s.client.crds.cell.CellList;
import io.cellery.observability.k8s.client.crds.cell.DoneableCell;
import io.cellery.observability.k8s.client.crds.composite.CompositeImpl;
import io.cellery.observability.k8s.client.crds.composite.CompositeList;
import io.cellery.observability.k8s.client.crds.composite.DoneableComposite;
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinition;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;

@Extension(name = "k8s-components", namespace = "source", description = "This is an event source which emits events upon changes to Cellery components deployed asKubernetes resource", examples = {@Example(syntax = "@source(type='k8s-components', @map(type='keyvalue', fail.on.missing.attribute='false'))\ndefine stream K8sComponentEventSourceStream (instance string, kind string, component string, creationTimestamp long, ingressTypes string, action string)", description = "This will listen for kubernetes component events and emit events upon changes to the components")})
/* loaded from: input_file:io/cellery/observability/k8s/client/ComponentsEventSource.class */
public class ComponentsEventSource extends Source {
    private static final Logger logger = Logger.getLogger(ComponentsEventSource.class.getName());
    private KubernetesClient k8sClient;
    private SourceEventListener sourceEventListener;
    private List<Watch> k8sWatches;

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.k8sWatches = new ArrayList(2);
        this.sourceEventListener = sourceEventListener;
    }

    public Class[] getOutputEventClasses() {
        return new Class[0];
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        this.k8sClient = K8sClientHolder.getK8sClient();
        if (logger.isDebugEnabled()) {
            logger.debug("Retrieved API server client instance");
        }
        CustomResourceDefinition customResourceDefinition = (CustomResourceDefinition) ((Resource) this.k8sClient.customResourceDefinitions().withName(Constants.CELL_CRD_NAME)).get();
        CustomResourceDefinition customResourceDefinition2 = (CustomResourceDefinition) ((Resource) this.k8sClient.customResourceDefinitions().withName(Constants.COMPOSITE_CRD_NAME)).get();
        this.k8sWatches.add((Watch) ((NonNamespaceOperation) this.k8sClient.customResources(customResourceDefinition, CellImpl.class, CellList.class, DoneableCell.class).inNamespace(Constants.NAMESPACE)).watch(new Watcher<CellImpl>() { // from class: io.cellery.observability.k8s.client.ComponentsEventSource.1
            public void eventReceived(Watcher.Action action, CellImpl cellImpl) {
                try {
                    long time = StringUtils.isEmpty(cellImpl.getMetadata().getCreationTimestamp()) ? -1L : new SimpleDateFormat(Constants.K8S_DATE_FORMAT, Locale.US).parse(cellImpl.getMetadata().getCreationTimestamp()).getTime();
                    for (Map.Entry<String, Set<String>> entry : Utils.getComponentIngressTypes(cellImpl).entrySet()) {
                        HashMap hashMap = new HashMap();
                        hashMap.put(Constants.Attribute.INSTANCE, cellImpl.getMetadata().getName());
                        hashMap.put(Constants.Attribute.COMPONENT, entry.getKey());
                        hashMap.put(Constants.Attribute.INSTANCE_KIND, Constants.CELL_KIND);
                        hashMap.put(Constants.Attribute.CREATION_TIMESTAMP, Long.valueOf(time));
                        hashMap.put(Constants.Attribute.INGRESS_TYPES, StringUtils.join(entry.getValue(), ","));
                        hashMap.put(Constants.Attribute.ACTION, action.toString());
                        ComponentsEventSource.this.sourceEventListener.onEvent(hashMap, new String[0]);
                    }
                } catch (ParseException e) {
                    ComponentsEventSource.logger.error("Ignored cell change due to creation timestamp parse failure", e);
                }
            }

            public void onClose(KubernetesClientException kubernetesClientException) {
                if (kubernetesClientException != null) {
                    ComponentsEventSource.logger.error("Kubernetes cell watcher closed with error", kubernetesClientException);
                } else if (ComponentsEventSource.logger.isDebugEnabled()) {
                    ComponentsEventSource.logger.debug("Kubernetes cell watcher closed successfully");
                }
            }
        }));
        if (logger.isDebugEnabled()) {
            logger.debug("Created cell watcher");
        }
        this.k8sWatches.add((Watch) ((NonNamespaceOperation) this.k8sClient.customResources(customResourceDefinition2, CompositeImpl.class, CompositeList.class, DoneableComposite.class).inNamespace(Constants.NAMESPACE)).watch(new Watcher<CompositeImpl>() { // from class: io.cellery.observability.k8s.client.ComponentsEventSource.2
            public void eventReceived(Watcher.Action action, CompositeImpl compositeImpl) {
                try {
                    long time = StringUtils.isEmpty(compositeImpl.getMetadata().getCreationTimestamp()) ? -1L : new SimpleDateFormat(Constants.K8S_DATE_FORMAT, Locale.US).parse(compositeImpl.getMetadata().getCreationTimestamp()).getTime();
                    for (Map.Entry<String, Set<String>> entry : Utils.getComponentIngressTypes(compositeImpl).entrySet()) {
                        HashMap hashMap = new HashMap();
                        hashMap.put(Constants.Attribute.INSTANCE, compositeImpl.getMetadata().getName());
                        hashMap.put(Constants.Attribute.COMPONENT, entry.getKey());
                        hashMap.put(Constants.Attribute.INSTANCE_KIND, Constants.COMPOSITE_KIND);
                        hashMap.put(Constants.Attribute.CREATION_TIMESTAMP, Long.valueOf(time));
                        hashMap.put(Constants.Attribute.INGRESS_TYPES, StringUtils.join(entry.getValue(), ","));
                        hashMap.put(Constants.Attribute.ACTION, action.toString());
                        ComponentsEventSource.this.sourceEventListener.onEvent(hashMap, new String[0]);
                    }
                } catch (ParseException e) {
                    ComponentsEventSource.logger.error("Ignored composite change due to creation timestamp parse failure", e);
                }
            }

            public void onClose(KubernetesClientException kubernetesClientException) {
                if (kubernetesClientException != null) {
                    ComponentsEventSource.logger.error("Kubernetes composite watcher closed with error", kubernetesClientException);
                } else if (ComponentsEventSource.logger.isDebugEnabled()) {
                    ComponentsEventSource.logger.debug("Kubernetes composite watcher closed successfully");
                }
            }
        }));
        if (logger.isDebugEnabled()) {
            logger.debug("Created composite watcher");
        }
    }

    public void disconnect() {
        while (this.k8sWatches.size() > 0) {
            this.k8sWatches.remove(0).close();
        }
        if (this.k8sClient != null) {
            this.k8sClient.close();
            if (logger.isDebugEnabled()) {
                logger.debug("Closed API server client");
            }
        }
    }

    public void destroy() {
    }

    public void pause() {
    }

    public void resume() {
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }
}
