/*
 * Decompiled with CFR 0.152.
 */
package io.cellery.observability.k8s.client;

import io.cellery.observability.k8s.client.K8sClientHolder;
import io.cellery.observability.k8s.client.Utils;
import io.cellery.observability.k8s.client.crds.cell.CellImpl;
import io.cellery.observability.k8s.client.crds.cell.CellList;
import io.cellery.observability.k8s.client.crds.cell.DoneableCell;
import io.cellery.observability.k8s.client.crds.composite.CompositeImpl;
import io.cellery.observability.k8s.client.crds.composite.CompositeList;
import io.cellery.observability.k8s.client.crds.composite.DoneableComposite;
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinition;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;

@Extension(name="k8s-components", namespace="source", description="This is an event source which emits events upon changes to Cellery components deployed asKubernetes resource", examples={@Example(syntax="@source(type='k8s-components', @map(type='keyvalue', fail.on.missing.attribute='false'))\ndefine stream K8sComponentEventSourceStream (instance string, kind string, component string, creationTimestamp long, ingressTypes string, action string)", description="This will listen for kubernetes component events and emit events upon changes to the components")})
public class ComponentsEventSource
extends Source {
    private static final Logger logger = Logger.getLogger((String)ComponentsEventSource.class.getName());
    private KubernetesClient k8sClient;
    private SourceEventListener sourceEventListener;
    private List<Watch> k8sWatches;

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strings, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.k8sWatches = new ArrayList<Watch>(2);
        this.sourceEventListener = sourceEventListener;
    }

    public Class[] getOutputEventClasses() {
        return new Class[0];
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        this.k8sClient = K8sClientHolder.getK8sClient();
        if (logger.isDebugEnabled()) {
            logger.debug((Object)"Retrieved API server client instance");
        }
        CustomResourceDefinition cellCrd = (CustomResourceDefinition)((Resource)this.k8sClient.customResourceDefinitions().withName("cells.mesh.cellery.io")).get();
        CustomResourceDefinition compositeCrd = (CustomResourceDefinition)((Resource)this.k8sClient.customResourceDefinitions().withName("composites.mesh.cellery.io")).get();
        Watch cellWatch = (Watch)((NonNamespaceOperation)this.k8sClient.customResources(cellCrd, CellImpl.class, CellList.class, DoneableCell.class).inNamespace("default")).watch((Object)new Watcher<CellImpl>(){

            public void eventReceived(Watcher.Action action, CellImpl cell) {
                try {
                    long creationTimestamp = StringUtils.isEmpty((CharSequence)cell.getMetadata().getCreationTimestamp()) ? -1L : new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.US).parse(cell.getMetadata().getCreationTimestamp()).getTime();
                    Map<String, Set<String>> componentIngressTypes = Utils.getComponentIngressTypes(cell);
                    for (Map.Entry<String, Set<String>> entry : componentIngressTypes.entrySet()) {
                        HashMap<String, Object> attributes = new HashMap<String, Object>();
                        attributes.put("instance", cell.getMetadata().getName());
                        attributes.put("component", entry.getKey());
                        attributes.put("instanceKind", "Cell");
                        attributes.put("creationTimestamp", creationTimestamp);
                        attributes.put("ingressTypes", StringUtils.join((Iterable)entry.getValue(), (String)","));
                        attributes.put("action", action.toString());
                        ComponentsEventSource.this.sourceEventListener.onEvent(attributes, new String[0]);
                    }
                }
                catch (ParseException e) {
                    logger.error((Object)"Ignored cell change due to creation timestamp parse failure", (Throwable)e);
                }
            }

            public void onClose(KubernetesClientException cause) {
                if (cause == null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)"Kubernetes cell watcher closed successfully");
                    }
                } else {
                    logger.error((Object)"Kubernetes cell watcher closed with error", (Throwable)cause);
                }
            }
        });
        this.k8sWatches.add(cellWatch);
        if (logger.isDebugEnabled()) {
            logger.debug((Object)"Created cell watcher");
        }
        Watch compositeWatch = (Watch)((NonNamespaceOperation)this.k8sClient.customResources(compositeCrd, CompositeImpl.class, CompositeList.class, DoneableComposite.class).inNamespace("default")).watch((Object)new Watcher<CompositeImpl>(){

            public void eventReceived(Watcher.Action action, CompositeImpl composite) {
                try {
                    long creationTimestamp = StringUtils.isEmpty((CharSequence)composite.getMetadata().getCreationTimestamp()) ? -1L : new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.US).parse(composite.getMetadata().getCreationTimestamp()).getTime();
                    Map<String, Set<String>> componentIngressTypes = Utils.getComponentIngressTypes(composite);
                    for (Map.Entry<String, Set<String>> entry : componentIngressTypes.entrySet()) {
                        HashMap<String, Object> attributes = new HashMap<String, Object>();
                        attributes.put("instance", composite.getMetadata().getName());
                        attributes.put("component", entry.getKey());
                        attributes.put("instanceKind", "Composite");
                        attributes.put("creationTimestamp", creationTimestamp);
                        attributes.put("ingressTypes", StringUtils.join((Iterable)entry.getValue(), (String)","));
                        attributes.put("action", action.toString());
                        ComponentsEventSource.this.sourceEventListener.onEvent(attributes, new String[0]);
                    }
                }
                catch (ParseException e) {
                    logger.error((Object)"Ignored composite change due to creation timestamp parse failure", (Throwable)e);
                }
            }

            public void onClose(KubernetesClientException cause) {
                if (cause == null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)"Kubernetes composite watcher closed successfully");
                    }
                } else {
                    logger.error((Object)"Kubernetes composite watcher closed with error", (Throwable)cause);
                }
            }
        });
        this.k8sWatches.add(compositeWatch);
        if (logger.isDebugEnabled()) {
            logger.debug((Object)"Created composite watcher");
        }
    }

    public void disconnect() {
        while (this.k8sWatches.size() > 0) {
            Watch watch = this.k8sWatches.remove(0);
            watch.close();
        }
        if (this.k8sClient != null) {
            this.k8sClient.close();
            if (logger.isDebugEnabled()) {
                logger.debug((Object)"Closed API server client");
            }
        }
    }

    public void destroy() {
    }

    public void pause() {
    }

    public void resume() {
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }
}

