package org.wso2.extension.siddhi.map.avro.sinkmapper;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import feign.FeignException;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.event.Event;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.output.sink.SinkListener;
import io.siddhi.core.stream.output.sink.SinkMapper;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.core.util.transport.TemplateBuilder;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.map.avro.util.AvroMessageProcessor;
import org.wso2.extension.siddhi.map.avro.util.schema.RecordSchema;
import org.wso2.extension.siddhi.map.avro.util.schema.SchemaRegistryReader;

@Extension(name = "avro", namespace = "sinkMapper", description = "This extension is a Siddhi Event to Avro Message output mapper.Transports that publish  messages to Avro sink can utilize this extension to convert Siddhi events to Avro messages.\n You can either specify the Avro schema or provide the schema registry URL and the schema reference ID as parameters in the stream definition.\nIf no Avro schema is specified, a flat Avro schema of the 'record' type is generated with the stream attributes as schema fields.", parameters = {@Parameter(name = "schema.def", description = "This specifies the required Avro schema to be used to convert Siddhi events to Avro messages.\nThe schema needs to be specified as a quoted JSON string.", type = {DataType.STRING}), @Parameter(name = "schema.registry", description = "This specifies the URL of the schema registry.", type = {DataType.STRING}), @Parameter(name = "schema.id", description = "This specifies the ID of the avro schema. This ID is the global ID that is returned from the schema registry when posting the schema to the registry. The specified ID is used to retrieve the schema from the schema registry.", type = {DataType.STRING})}, examples = {@Example(syntax = "@sink(type='inMemory', topic='stock', @map(type='avro',schema.def = \"\"\"{\"type\":\"record\",\"name\":\"stock\",\"namespace\":\"stock.example\",\"fields\":[{\"name\":\"symbol\",\"type\":\"string\"},{\"name\":\"price\":\"type\":\"float\"},{\"name\":\"volume\",\"type\":\"long\"}]}\"\"\"))\ndefine stream StockStream (symbol string, price float, volume long);", description = "The above configuration performs a default Avro mapping that generates an Avro message as an output ByteBuffer."), @Example(syntax = "@sink(type='inMemory', topic='stock', @map(type='avro',schema.registry = 'http://localhost:8081', schema.id ='22',@payload(\"\"\"{\"Symbol\":{{symbol}},\"Price\":{{price}},\"Volume\":{{volume}}}\"\"\"\n)))\ndefine stream StockStream (symbol string, price float, volume long);", description = "The above configuration performs a custom Avro mapping that generates an Avro message as an output ByteBuffer. The Avro schema is retrieved from the given schema registry (localhost:8081) using the schema ID provided.")})
/* loaded from: input_file:org/wso2/extension/siddhi/map/avro/sinkmapper/AvroSinkMapper.class */
public class AvroSinkMapper extends SinkMapper {
    private static final Logger log = Logger.getLogger(AvroSinkMapper.class);
    private static final String DEFAULT_AVRO_MAPPING_PREFIX = "schema";
    private static final String SCHEMA_IDENTIFIER = "def";
    private static final String UNDEFINED = "undefined";
    private static final String SCHEMA_REGISTRY = "registry";
    private static final String SCHEMA_ID = "id";
    private String[] attributeNameArray;
    private Schema schema;
    private List<Attribute> attributeList;

    public void init(StreamDefinition streamDefinition, OptionHolder optionHolder, Map<String, TemplateBuilder> map, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.attributeNameArray = streamDefinition.getAttributeNameArray();
        this.attributeList = streamDefinition.getAttributeList();
        if (map != null && map.size() != 1) {
            throw new SiddhiAppCreationException("Avro sink-mapper does not support multiple @payload mappings, error at the mapper of '" + streamDefinition.getId() + "'");
        }
        if (map != null && map.get(map.keySet().iterator().next()).isObjectMessage()) {
            throw new SiddhiAppCreationException("Avro sink-mapper does not support object @payload mappings, error at the mapper of '" + streamDefinition.getId() + "'");
        }
        this.schema = getAvroSchema(optionHolder.validateAndGetStaticValue(DEFAULT_AVRO_MAPPING_PREFIX.concat(".").concat(SCHEMA_IDENTIFIER), (String) null), optionHolder.validateAndGetStaticValue(DEFAULT_AVRO_MAPPING_PREFIX.concat(".").concat(SCHEMA_REGISTRY), (String) null), optionHolder.validateAndGetStaticValue(DEFAULT_AVRO_MAPPING_PREFIX.concat(".").concat(SCHEMA_ID), (String) null), streamDefinition.getId());
    }

    private Schema getAvroSchema(String str, String str2, String str3, String str4) {
        Schema generateAvroSchema;
        try {
            if (str != null) {
                generateAvroSchema = new Schema.Parser().parse(str);
            } else if (str2 != null) {
                generateAvroSchema = new SchemaRegistryReader().getSchemaFromID(str2, str3);
            } else {
                if (this.attributeList.size() <= 0) {
                    throw new SiddhiAppCreationException("Avro Schema is not specified in the stream definition. " + str4);
                }
                log.warn("Schema Definition and Schema Registry is not specified in Stream. Hence generating schema from stream attributes.");
                generateAvroSchema = new RecordSchema().generateAvroSchema(this.attributeList, str4);
            }
            if (generateAvroSchema == null) {
                throw new SiddhiAppCreationException("Error when generating Avro Schema for stream: " + str4);
            }
            return generateAvroSchema;
        } catch (FeignException e) {
            throw new SiddhiAppCreationException("Error when retriving schema from schema registry. " + e.getMessage());
        } catch (SchemaParseException e2) {
            throw new SiddhiAppCreationException("Unable to parse Schema for stream:" + str4 + ". " + e2.getMessage());
        }
    }

    public void mapAndSend(Event[] eventArr, OptionHolder optionHolder, Map<String, TemplateBuilder> map, SinkListener sinkListener) {
        ArrayList arrayList = new ArrayList();
        for (Event event : eventArr) {
            byte[] mapSingleEvent = mapSingleEvent(event, map);
            if (mapSingleEvent != null) {
                arrayList.add(mapSingleEvent);
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            sinkListener.publish(ByteBuffer.wrap((byte[]) it.next()));
        }
    }

    public void mapAndSend(Event event, OptionHolder optionHolder, Map<String, TemplateBuilder> map, SinkListener sinkListener) {
        byte[] mapSingleEvent = mapSingleEvent(event, map);
        if (mapSingleEvent != null) {
            sinkListener.publish(ByteBuffer.wrap(mapSingleEvent));
        }
    }

    private byte[] mapSingleEvent(Event event, Map<String, TemplateBuilder> map) {
        return map == null ? constructAvroForDefaultMapping(event) : constructAvroForCustomMapping(event, map.get(map.keySet().iterator().next()));
    }

    private byte[] constructAvroForDefaultMapping(Object obj) {
        byte[] bArr = null;
        if (!(obj instanceof Event)) {
            log.error("Invalid object type. " + obj.toString() + " of type " + obj.getClass().getName() + " cannot be converted to an Avro Message");
            return null;
        }
        Event event = (Event) obj;
        try {
            bArr = AvroMessageProcessor.serializeAvroMessage(constructSingleEventForDefaultMapping(event).toString(), this.schema);
        } catch (Throwable th) {
            log.error("Error when converting siddhi event: " + Arrays.toString(event.getData()) + " to Avro message of schema: " + this.schema + "." + th.getMessage() + ". Hence dropping the event.");
        }
        return bArr;
    }

    private byte[] constructAvroForCustomMapping(Event event, TemplateBuilder templateBuilder) {
        try {
            return AvroMessageProcessor.serializeAvroMessage((String) templateBuilder.build(doPartialProcessing(event)), this.schema);
        } catch (Throwable th) {
            log.error("Error when converting siddhi event: " + Arrays.toString(event.getData()) + " to Avro message of schema: " + this.schema + "." + th.getMessage() + ". Hence dropping the event.");
            return null;
        }
    }

    private JsonObject constructSingleEventForDefaultMapping(Event event) {
        Object[] data = event.getData();
        JsonObject jsonObject = new JsonObject();
        Gson gson = new Gson();
        for (int i = 0; i < data.length; i++) {
            String str = this.attributeNameArray[i];
            Object obj = data[i];
            if (obj == null) {
                jsonObject.addProperty(str, UNDEFINED);
            } else if (obj instanceof String) {
                jsonObject.addProperty(str, obj.toString());
            } else if (obj instanceof Number) {
                jsonObject.addProperty(str, (Number) obj);
            } else if (obj instanceof Boolean) {
                jsonObject.addProperty(str, (Boolean) obj);
            } else if ((obj instanceof Map) && !((Map) obj).isEmpty()) {
                jsonObject.add(str, gson.toJsonTree(obj));
            }
        }
        return jsonObject;
    }

    private Event doPartialProcessing(Event event) {
        Object[] data = event.getData();
        for (int i = 0; i < data.length; i++) {
            if (data[i] == null) {
                data[i] = UNDEFINED;
            }
        }
        return event;
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{ByteBuffer.class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[0];
    }
}
