package io.cellery.observability.k8s.client;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;

@Extension(name = "getComponentPods", namespace = "k8sClient", description = "This is a client which calls the Kubernetes API server based on the received parameters and adds the pod details received. Each pod will be a separate event duplicated from the original eventsent to this stream processor. If m number of multiple events are sent while n pods are present,m x n events will be sent out. This read the Service Account Token loaded into the pod and calls the API Server using that.", examples = {@Example(syntax = "k8sClient:getComponentPods()", description = "This will fetch the currently running pods from the K8s API Servers")})
/* loaded from: input_file:io/cellery/observability/k8s/client/GetComponentPodsStreamProcessor.class */
public class GetComponentPodsStreamProcessor extends StreamProcessor {
    private static final Logger logger = Logger.getLogger(GetComponentPodsStreamProcessor.class.getName());
    private KubernetesClient k8sClient;

    protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        if (expressionExecutorArr.length != 0) {
            throw new SiddhiAppValidationException("k8sClient:getComponentPods() expects exactly zero input parameters, but " + expressionExecutorArr.length + " attributes found");
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Attribute("cell", Attribute.Type.STRING));
        arrayList.add(new Attribute("component", Attribute.Type.STRING));
        arrayList.add(new Attribute("name", Attribute.Type.STRING));
        arrayList.add(new Attribute("creationTimestamp", Attribute.Type.LONG));
        arrayList.add(new Attribute("nodeName", Attribute.Type.STRING));
        return arrayList;
    }

    public void start() {
        this.k8sClient = K8sClientHolder.getK8sClient();
        if (logger.isDebugEnabled()) {
            logger.debug("Retrieved API server client instance");
        }
    }

    public void stop() {
        if (this.k8sClient != null) {
            this.k8sClient.close();
            if (logger.isDebugEnabled()) {
                logger.debug("Closed API server client");
            }
        }
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        ComplexEventChunk<StreamEvent> complexEventChunk2 = new ComplexEventChunk<>(true);
        while (complexEventChunk.hasNext()) {
            StreamEvent streamEvent = (StreamEvent) complexEventChunk.next();
            try {
                addComponentPods(complexEventChunk2, streamEvent, Constants.COMPONENT_NAME_LABEL);
                addComponentPods(complexEventChunk2, streamEvent, Constants.GATEWAY_NAME_LABEL);
            } catch (ParseException e) {
                logger.error("Failed to parse K8s timestamp", e);
            }
        }
        if (complexEventChunk2.getFirst() != null) {
            processor.process(complexEventChunk2);
        }
    }

    private void addComponentPods(ComplexEventChunk<StreamEvent> complexEventChunk, StreamEvent streamEvent, String str) throws ParseException {
        PodList podList = null;
        try {
            podList = (PodList) ((FilterWatchListDeletable) ((FilterWatchListDeletable) ((FilterWatchListDeletable) ((NonNamespaceOperation) this.k8sClient.pods().inNamespace(Constants.NAMESPACE)).withLabel(Constants.CELL_NAME_LABEL)).withLabel(str)).withField(Constants.STATUS_FIELD, Constants.STATUS_FIELD_RUNNING_VALUE)).list();
        } catch (Throwable th) {
            logger.error("Failed to fetch current pods for components", th);
        }
        if (podList != null) {
            for (Pod pod : podList.getItems()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Added event - pod " + pod.getMetadata().getName() + " belonging to cell " + ((String) pod.getMetadata().getLabels().get(Constants.CELL_NAME_LABEL)) + " of type " + (Constants.COMPONENT_NAME_LABEL.equals(str) ? "component" : "gateway") + " to the event");
                }
                Object[] objArr = {pod.getMetadata().getLabels().getOrDefault(Constants.CELL_NAME_LABEL, ""), Utils.getComponentName(pod), pod.getMetadata().getName(), Long.valueOf(new SimpleDateFormat(Constants.K8S_DATE_FORMAT, Locale.US).parse(pod.getMetadata().getCreationTimestamp()).getTime()), pod.getSpec().getNodeName()};
                StreamEvent copyStreamEvent = this.streamEventCloner.copyStreamEvent(streamEvent);
                this.complexEventPopulater.populateComplexEvent(copyStreamEvent, objArr);
                complexEventChunk.add(copyStreamEvent);
            }
        }
    }
}
