/*
 * Decompiled with CFR 0.152.
 */
package io.cellery.observability.k8s.client;

import io.cellery.observability.k8s.client.K8sClientHolder;
import io.cellery.observability.k8s.client.Utils;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;

@Extension(name="k8s-component-pods", namespace="source", description="This is an event source which emits events upon changes to Cellery Components deployed asKubernetes Pods", examples={@Example(syntax="@source(type='k8s-component-pods', @map(type='keyvalue', fail.on.missing.attribute='false'))\ndefine stream K8sPodEvents (instance string, kind string, component string, podName string, creationTimestamp long, deletionTimestamp long, nodeName string, status string, action string)", description="This will listen for kubernetes pod events and emit events upon changes to the pods")})
public class ComponentPodsEventSource
extends Source {
    private static final Logger logger = Logger.getLogger((String)ComponentPodsEventSource.class.getName());
    private KubernetesClient k8sClient;
    private SourceEventListener sourceEventListener;
    private List<Watch> k8sWatches;

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strings, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.k8sWatches = new ArrayList<Watch>(3);
        this.sourceEventListener = sourceEventListener;
    }

    public void connect(Source.ConnectionCallback connectionCallback) {
        this.k8sClient = K8sClientHolder.getK8sClient();
        if (logger.isDebugEnabled()) {
            logger.debug((Object)"Retrieved API server client instance");
        }
        Watch componentsWatch = (Watch)((FilterWatchListDeletable)((FilterWatchListDeletable)((NonNamespaceOperation)this.k8sClient.pods().inNamespace("default")).withLabel("observability.mesh.cellery.io/instance")).withLabel("observability.mesh.cellery.io/component")).watch((Object)new PodWatcher(this.sourceEventListener, "observability.mesh.cellery.io/component"));
        this.k8sWatches.add(componentsWatch);
        if (logger.isDebugEnabled()) {
            logger.debug((Object)"Created pod watcher for components");
        }
        Watch gatewaysWatch = (Watch)((FilterWatchListDeletable)((FilterWatchListDeletable)((NonNamespaceOperation)this.k8sClient.pods().inNamespace("default")).withLabel("observability.mesh.cellery.io/instance")).withLabel("observability.mesh.cellery.io/gateway")).watch((Object)new PodWatcher(this.sourceEventListener, "observability.mesh.cellery.io/gateway"));
        this.k8sWatches.add(gatewaysWatch);
        if (logger.isDebugEnabled()) {
            logger.debug((Object)"Created pod watcher for gateways");
        }
    }

    public void disconnect() {
        while (this.k8sWatches.size() > 0) {
            Watch watch = this.k8sWatches.remove(0);
            watch.close();
        }
        if (this.k8sClient != null) {
            this.k8sClient.close();
            if (logger.isDebugEnabled()) {
                logger.debug((Object)"Closed API server client");
            }
        }
    }

    public void destroy() {
    }

    public void pause() {
    }

    public void resume() {
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> state) {
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{Map.class};
    }

    private static class PodWatcher
    implements Watcher<Pod> {
        private static final Logger logger = Logger.getLogger((String)PodWatcher.class.getName());
        private final SourceEventListener sourceEventListener;
        private final String componentNameLabel;

        PodWatcher(SourceEventListener sourceEventListener, String componentNameLabel) {
            this.sourceEventListener = sourceEventListener;
            this.componentNameLabel = componentNameLabel;
        }

        public void eventReceived(Watcher.Action action, Pod pod) {
            try {
                HashMap<String, Object> attributes = new HashMap<String, Object>();
                attributes.put("instance", pod.getMetadata().getLabels().getOrDefault("observability.mesh.cellery.io/instance", ""));
                attributes.put("component", Utils.getComponentName(pod));
                attributes.put("podName", pod.getMetadata().getName());
                attributes.put("instanceKind", pod.getMetadata().getLabels().getOrDefault("observability.mesh.cellery.io/instance-kind", ""));
                attributes.put("creationTimestamp", pod.getMetadata().getCreationTimestamp() == null ? -1L : new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.US).parse(pod.getMetadata().getCreationTimestamp()).getTime());
                attributes.put("deletionTimestamp", pod.getMetadata().getDeletionTimestamp() == null ? -1L : new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.US).parse(pod.getMetadata().getDeletionTimestamp()).getTime());
                attributes.put("nodeName", pod.getSpec().getNodeName() == null ? "" : pod.getSpec().getNodeName());
                attributes.put("status", pod.getStatus().getPhase());
                attributes.put("action", action.toString());
                this.sourceEventListener.onEvent(attributes, new String[0]);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Emitted event - pod " + pod.getMetadata().getName() + " with resource version " + pod.getMetadata().getResourceVersion() + " belonging to " + pod.getMetadata().getLabels().getOrDefault("observability.mesh.cellery.io/instance-kind", "") + " " + pod.getMetadata().getLabels().getOrDefault("observability.mesh.cellery.io/instance", "") + " of type " + ("observability.mesh.cellery.io/component".equals(this.componentNameLabel) ? "component" : "gateway")));
                }
            }
            catch (ParseException e) {
                logger.error((Object)"Ignored pod change due to creation timestamp parse failure", (Throwable)e);
            }
        }

        public void onClose(KubernetesClientException cause) {
            if (cause == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Kubernetes " + ("observability.mesh.cellery.io/component".equals(this.componentNameLabel) ? "component" : "gateway") + " pod watcher closed successfully"));
                }
            } else {
                logger.error((Object)("Kubernetes " + ("observability.mesh.cellery.io/component".equals(this.componentNameLabel) ? "component" : "gateway") + " pod watcher closed with error"), (Throwable)cause);
            }
        }
    }
}

