package io.cellery.observability.tracing.receiver;

import com.google.gson.Gson;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import io.cellery.observability.tracing.receiver.internal.Codec;
import io.cellery.observability.tracing.receiver.internal.ZipkinSpan;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;

@Extension(name = "tracing-receiver", namespace = "source", description = "This is the tracing Receiver for Cellery. This accepts Zipkin encoded tracing data. The event source outputs a map of attributes. Therefore a key-value mapper needs to be used.", parameters = {@Parameter(name = "ip", type = {DataType.STRING}, description = "IP to which the server connector should be bound to", optional = true, defaultValue = Constants.DEFAULT_TRACING_RECEIVER_IP), @Parameter(name = Constants.TRACING_RECEIVER_PORT_KEY, type = {DataType.INT}, description = "Port on which the server connector should listen on", optional = true, defaultValue = Constants.DEFAULT_TRACING_RECEIVER_PORT)}, examples = {@Example(syntax = "@source(type='tracing-receiver', @map(type='keyvalue', fail.on.missing.attribute='false'))\ndefine stream ZipkinStreamIn (traceId string, id string, parentId string, name string, serviceName string, kind string, timestamp long, duration long, tags string)", description = "This produced events when Zipkin tracing data is received on amy interface on port 9411. The stream definition of the event source is fixed since it depends on the Zipkin format")})
/* loaded from: input_file:io/cellery/observability/tracing/receiver/TracingEventSource.class */
public class TracingEventSource extends Source {
    private static final Logger logger = Logger.getLogger(TracingEventSource.class.getName());
    private static final Gson gson = new Gson();
    private HttpServer httpServer;
    private HttpServerListener httpServerListener;
    private String host;
    private String apiContext;
    private int port;

    /* loaded from: input_file:io/cellery/observability/tracing/receiver/TracingEventSource$HttpServerListener.class */
    public static class HttpServerListener implements HttpHandler {
        private static final Logger logger = Logger.getLogger(HttpServerListener.class.getName());
        private SourceEventListener sourceEventListener;

        HttpServerListener(SourceEventListener sourceEventListener) {
            this.sourceEventListener = sourceEventListener;
        }

        public void handle(HttpExchange httpExchange) throws IOException {
            handleEventReceive(IOUtils.toByteArray(httpExchange.getRequestBody()), httpExchange.getRequestHeaders().getFirst(Constants.HTTP_CONTENT_TYPE_HEADER));
            httpExchange.sendResponseHeaders(200, 0L);
            httpExchange.close();
        }

        private void handleEventReceive(byte[] bArr, String str) {
            if (logger.isDebugEnabled()) {
                logger.debug("Received message of type " + str);
            }
            List<ZipkinSpan> list = null;
            if (Objects.equals(str, Constants.HTTP_APPLICATION_THRIFT_CONTENT_TYPE)) {
                try {
                    list = Codec.decodeThriftData(bArr);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Decoded " + list.size() + " Thrift encoded Zipkin Spans");
                    }
                } catch (TException e) {
                    logger.error("Failed to decode Thrift tracing data", e);
                }
            } else {
                list = Codec.decodeData(bArr);
                if (logger.isDebugEnabled()) {
                    logger.debug("Decoded " + list.size() + " Thrift encoded Zipkin Spans");
                }
            }
            if (list != null) {
                for (ZipkinSpan zipkinSpan : list) {
                    HashMap hashMap = new HashMap();
                    hashMap.put(Constants.ATTRIBUTE_TRACE_ID, zipkinSpan.getTraceId());
                    hashMap.put(Constants.ATTRIBUTE_SPAN_ID, zipkinSpan.getId());
                    hashMap.put(Constants.ATTRIBUTE_PARENT_ID, zipkinSpan.getParentId());
                    hashMap.put(Constants.ATTRIBUTE_NAME, zipkinSpan.getName());
                    hashMap.put(Constants.ATTRIBUTE_SERVICE_NAME, zipkinSpan.getServiceName());
                    hashMap.put(Constants.ATTRIBUTE_KIND, zipkinSpan.getKind());
                    hashMap.put(Constants.ATTRIBUTE_TIMESTAMP, Long.valueOf(zipkinSpan.getTimestamp() / 1000));
                    hashMap.put(Constants.ATTRIBUTE_DURATION, Long.valueOf(zipkinSpan.getDuration() / 1000));
                    hashMap.put(Constants.ATTRIBUTE_TAGS, TracingEventSource.gson.toJson(zipkinSpan.getTags()));
                    this.sourceEventListener.onEvent(hashMap, new String[0]);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Emitted event - span " + zipkinSpan.getTraceId() + "-" + zipkinSpan.getId() + " to event source listener");
                    }
                }
            }
        }
    }

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.host = optionHolder.validateAndGetStaticValue(Constants.TRACING_RECEIVER_HOST_KEY, Constants.DEFAULT_TRACING_RECEIVER_IP);
        this.port = Integer.parseInt(optionHolder.validateAndGetStaticValue(Constants.TRACING_RECEIVER_PORT_KEY, Constants.DEFAULT_TRACING_RECEIVER_PORT));
        this.apiContext = optionHolder.validateAndGetStaticValue(Constants.TRACING_RECEIVER_API_CONTEXT_KEY, Constants.DEFAULT_TRACING_RECEIVER_API_CONTEXT);
        this.httpServerListener = new HttpServerListener(sourceEventListener);
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        try {
            this.httpServer = HttpServer.create(new InetSocketAddress(this.host, this.port), 0);
            this.httpServer.createContext(this.apiContext).setHandler(this.httpServerListener);
            this.httpServer.start();
            if (logger.isDebugEnabled()) {
                logger.debug("Started HTTP Server started and receiving requests on http://" + this.host + ":" + this.port + this.apiContext);
            }
        } catch (IOException e) {
            throw new ConnectionUnavailableException("Failed to instantiate HTTP Server");
        }
    }

    public void disconnect() {
        if (this.httpServer != null) {
            this.httpServer.stop(0);
            if (logger.isDebugEnabled()) {
                logger.debug("HTTP Server Shutdown");
            }
            this.httpServer = null;
        }
    }

    public void destroy() {
    }

    public void pause() {
    }

    public void resume() {
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{Map.class};
    }
}
