/*
 * Decompiled with CFR 0.152.
 */
package io.cellery.observability.k8s.client;

import io.cellery.observability.k8s.client.K8sClientHolder;
import io.cellery.observability.k8s.client.Utils;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;

@Extension(name="getComponentPods", namespace="k8sClient", description="This is a client which calls the Kubernetes API server based on the received parameters and adds the pod details received. Each pod will be a separate event duplicated from the original eventsent to this stream processor. If m number of multiple events are sent while n pods are present,m x n events will be sent out. This read the Service Account Token loaded into the pod and calls the API Server using that.", examples={@Example(syntax="k8sClient:getComponentPods()", description="This will fetch the currently running pods from the K8s API Servers")})
public class GetComponentPodsStreamProcessor
extends StreamProcessor {
    private static final Logger logger = Logger.getLogger((String)GetComponentPodsStreamProcessor.class.getName());
    private KubernetesClient k8sClient;

    protected List<Attribute> init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        int attributeLength = attributeExpressionExecutors.length;
        if (attributeLength != 0) {
            throw new SiddhiAppValidationException("k8sClient:getComponentPods() expects exactly zero input parameters, but " + attributeExpressionExecutors.length + " attributes found");
        }
        ArrayList<Attribute> appendedAttributes = new ArrayList<Attribute>();
        appendedAttributes.add(new Attribute("instance", Attribute.Type.STRING));
        appendedAttributes.add(new Attribute("component", Attribute.Type.STRING));
        appendedAttributes.add(new Attribute("podName", Attribute.Type.STRING));
        appendedAttributes.add(new Attribute("instanceKind", Attribute.Type.STRING));
        appendedAttributes.add(new Attribute("creationTimestamp", Attribute.Type.LONG));
        appendedAttributes.add(new Attribute("nodeName", Attribute.Type.STRING));
        return appendedAttributes;
    }

    public void start() {
        this.k8sClient = K8sClientHolder.getK8sClient();
        if (logger.isDebugEnabled()) {
            logger.debug((Object)"Retrieved API server client instance");
        }
    }

    public void stop() {
        if (this.k8sClient != null) {
            this.k8sClient.close();
            if (logger.isDebugEnabled()) {
                logger.debug((Object)"Closed API server client");
            }
        }
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> state) {
    }

    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        ComplexEventChunk outputStreamEventChunk = new ComplexEventChunk(true);
        while (streamEventChunk.hasNext()) {
            StreamEvent incomingStreamEvent = (StreamEvent)streamEventChunk.next();
            try {
                this.addComponentPods((ComplexEventChunk<StreamEvent>)outputStreamEventChunk, incomingStreamEvent, "observability.mesh.cellery.io/component");
                this.addComponentPods((ComplexEventChunk<StreamEvent>)outputStreamEventChunk, incomingStreamEvent, "observability.mesh.cellery.io/gateway");
            }
            catch (ParseException e) {
                logger.error((Object)"Failed to parse K8s timestamp", (Throwable)e);
            }
        }
        if (outputStreamEventChunk.getFirst() != null) {
            nextProcessor.process(outputStreamEventChunk);
        }
    }

    private void addComponentPods(ComplexEventChunk<StreamEvent> outputStreamEventChunk, StreamEvent incomingStreamEvent, String componentNameLabel) throws ParseException {
        PodList componentPodList = null;
        try {
            componentPodList = (PodList)((FilterWatchListDeletable)((FilterWatchListDeletable)((FilterWatchListDeletable)((NonNamespaceOperation)this.k8sClient.pods().inNamespace("default")).withLabel("observability.mesh.cellery.io/instance")).withLabel(componentNameLabel)).withField("status.phase", "Running")).list();
        }
        catch (Throwable e) {
            logger.error((Object)"Failed to fetch current pods for components", e);
        }
        if (componentPodList != null) {
            for (Pod pod : componentPodList.getItems()) {
                String kind = pod.getMetadata().getLabels().getOrDefault("observability.mesh.cellery.io/instance-kind", "");
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Added event - pod " + pod.getMetadata().getName() + " belonging to " + kind + " " + pod.getMetadata().getLabels().getOrDefault("observability.mesh.cellery.io/instance", "") + " of type " + ("observability.mesh.cellery.io/component".equals(componentNameLabel) ? "component" : "gateway") + " to the event"));
                }
                Object[] newData = new Object[]{pod.getMetadata().getLabels().getOrDefault("observability.mesh.cellery.io/instance", ""), Utils.getComponentName(pod), pod.getMetadata().getName(), kind, new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.US).parse(pod.getMetadata().getCreationTimestamp()).getTime(), pod.getSpec().getNodeName()};
                StreamEvent streamEventCopy = this.streamEventCloner.copyStreamEvent(incomingStreamEvent);
                this.complexEventPopulater.populateComplexEvent((ComplexEvent)streamEventCopy, newData);
                outputStreamEventChunk.add((ComplexEvent)streamEventCopy);
            }
        }
    }
}

