package io.siddhi.extension.store.elasticsearch.sink;

import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.store.elasticsearch.ElasticsearchConfigs;
import io.siddhi.extension.store.elasticsearch.exceptions.ElasticsearchEventSinkException;
import io.siddhi.extension.store.elasticsearch.utils.ElasticsearchTableConstants;
import io.siddhi.extension.store.elasticsearch.utils.SiddhiIndexRequest;
import io.siddhi.query.api.definition.StreamDefinition;
import java.io.IOException;
import java.util.Iterator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xcontent.XContentType;

@Extension(name = "elasticsearch", namespace = "sink", description = "Elasticsearch sink implementation uses Elasticsearch indexing document for underlying data storage. The events that are published from the sink will be converted into elasticsearch index documents. The elasticsearch sink is connected to the Elastisearch server via the Elasticsearch Java High Level REST Client library. By using this sink, we can customize the json document before it's stored in the elasticsearch.", parameters = {@Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_HOSTNAME, description = "The hostname of the Elasticsearch server.", type = {DataType.STRING}, optional = true, defaultValue = ElasticsearchTableConstants.DEFAULT_HOSTNAME), @Parameter(name = "port", description = "The port of the Elasticsearch server.", type = {DataType.INT}, optional = true, defaultValue = "9200"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_SCHEME, description = "The scheme type of the Elasticsearch server connection.", type = {DataType.STRING}, optional = true, defaultValue = "http"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_MEMBER_LIST, description = "The list of elasticsearch host names. in comma separated manner`https://hostname1:9200,https://hostname2:9200`", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_USER, description = "The username for the Elasticsearch server connection.", type = {DataType.STRING}, optional = true, defaultValue = ElasticsearchTableConstants.DEFAULT_USER_NAME), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_PASSWORD, description = "The password for the Elasticsearch server connection.", type = {DataType.STRING}, optional = true, defaultValue = ElasticsearchTableConstants.DEFAULT_PASSWORD), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_INDEX_NAME, description = "The name of the Elasticsearch index.This must be in lower case", type = {DataType.STRING}, optional = true, defaultValue = "The table name defined in the Siddhi App query."), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_PAYLOAD_INDEX_OF_INDEX_NAME, description = "The payload which is used to create the index. This can be used if the user needs to create index names dynamically. This must be in lower case. If this parameter is configured then respective elasticsearch table can be only used for insert operations because indices are created in the runtime dynamically.", type = {DataType.INT}, optional = true, defaultValue = "-1"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_INDEX_ALIAS, description = "The alias of the Elasticsearch index.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_INDEX_NUMBER_OF_SHARDS, description = "The number of shards allocated for the index in the Elasticsearch server.", type = {DataType.INT}, optional = true, defaultValue = "3"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_INDEX_NUMBER_OF_REPLICAS, description = "The number of replicas for the index in the Elasticsearch server.", type = {DataType.INT}, optional = true, defaultValue = "2"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_BULK_ACTIONS, description = "The number of actions to be added to flush a new bulk request. Use -1 to disable it", type = {DataType.INT}, optional = true, defaultValue = "1"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_BULK_SIZE, description = "The size of size of actions currently added to the bulk request to flush a new bulk request in MB. Use -1 to disable it", type = {DataType.LONG}, optional = true, defaultValue = "1"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_CONCURRENT_REQUESTS, description = "The number of concurrent requests allowed to be executed. Use 0 to only allow the execution of a single request", type = {DataType.INT}, optional = true, defaultValue = "0"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_FLUSH_INTERVAL, description = "The flush interval flushing any BulkRequest pending if the interval passes.", type = {DataType.LONG}, optional = true, defaultValue = "10"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_BACKOFF_POLICY_RETRY_NO, description = "The number of retries until backoff (The backoff policy defines how the bulk processor should handle retries of bulk requests internally in case they have failed due to resource constraints (i.e. a thread pool was full)).", type = {DataType.INT}, optional = true, defaultValue = "3"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_BACKOFF_POLICY_WAIT_TIME, description = "The constant back off policy that initially waits until the next retry in seconds.", type = {DataType.LONG}, optional = true, defaultValue = "1"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_SSL_ENABLED, description = "SSL is enabled or not.", type = {DataType.BOOL}, optional = true, defaultValue = "null"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_TRUSRTSTORE_TYPE, description = "Trust store type.", type = {DataType.STRING}, optional = true, defaultValue = ElasticsearchTableConstants.DEFAULT_TRUSTSTORE_TYPE), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_TRUSRTSTORE_PATH, description = "Trust store path.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_TRUSRTSTORE_PASS, description = "Trust store password.", type = {DataType.STRING}, optional = true, defaultValue = ElasticsearchTableConstants.DEFAULT_TRUSTSTORE_PASS), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_BACKOFF_POLICY, description = "Provides a backoff policy(eg: constantBackoff, exponentialBackoff, disable) for bulk requests, whenever a bulk request is rejected due to resource constraints. Bulk processor will wait before the operation is retried internally.", type = {DataType.STRING}, optional = true, defaultValue = ElasticsearchTableConstants.ANNOTATION_ELEMENT_BACKOFF_POLICY_CONSTANT_BACKOFF), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_BACKOFF_POLICY_RETRY_NO, description = "The maximum number of retries. Must be a non-negative number.", type = {DataType.INT}, optional = true, defaultValue = "3"), @Parameter(name = ElasticsearchTableConstants.ANNOTATION_ELEMENT_BACKOFF_POLICY_WAIT_TIME, description = "The delay defines how long to wait between retry attempts. Must not be null.", type = {DataType.INT}, optional = true, defaultValue = "1")}, examples = {@Example(syntax = "@sink(type='elasticsearch', hostname='172.0.0.1', port='9200',index.name='stock_index', @map(type='json', @payload(\"\"\"{\n   \"Stock Data\":{\n      \"Symbol\":\"{{symbol}}\",\n      \"Price\":{{price}},\n      \"Volume\":{{volume}}\n   }\n}\"\"\")))define stream stock_stream(symbol string, price float, volume long);", description = "This will create an index called 'stock_index' if it does not already exist in the elasticsearch server and saves the custom json document.")})
/* loaded from: input_file:io/siddhi/extension/store/elasticsearch/sink/ElasticsearchSink.class */
public class ElasticsearchSink extends Sink {
    private static final Logger logger = LogManager.getLogger((Class<?>) ElasticsearchSink.class);
    private ElasticsearchConfigs elasticsearchConfigs;

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[0];
    }

    protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.elasticsearchConfigs = new ElasticsearchConfigs(this);
        this.elasticsearchConfigs.init(streamDefinition, configReader, siddhiAppContext);
        return null;
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        SiddhiIndexRequest siddhiIndexRequest = new SiddhiIndexRequest(obj, dynamicOptions, this.elasticsearchConfigs.getIndexName());
        try {
            JsonElement parse = new JsonParser().parse((String) obj);
            if (parse.isJsonArray()) {
                Iterator it = parse.getAsJsonArray().iterator();
                while (it.hasNext()) {
                    siddhiIndexRequest.source(((JsonElement) it.next()).toString(), XContentType.JSON);
                    this.elasticsearchConfigs.getBulkProcessor().add((IndexRequest) siddhiIndexRequest);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug(obj + " has been successfully added.");
                }
            } else if (parse.isJsonObject()) {
                siddhiIndexRequest.source(parse.getAsJsonObject().toString(), XContentType.JSON);
                this.elasticsearchConfigs.getBulkProcessor().add((IndexRequest) siddhiIndexRequest);
            }
        } catch (JsonSyntaxException e) {
            throw new ElasticsearchEventSinkException("Invalid json document, Please recheck the json mapping atthe '@payload'.", e);
        }
    }

    public void connect() throws ConnectionUnavailableException {
        if (this.elasticsearchConfigs.getIndexName() == null || this.elasticsearchConfigs.getIndexName().isEmpty()) {
            return;
        }
        createIndex(this.elasticsearchConfigs.getDefinitionId());
    }

    public void disconnect() {
        try {
            try {
                this.elasticsearchConfigs.getRestHighLevelClient().close();
                this.elasticsearchConfigs.setRestHighLevelClient(null);
            } catch (IOException e) {
                throw new ElasticsearchEventSinkException("An error occurred while closing the elasticsearch client.", e);
            }
        } catch (Throwable th) {
            this.elasticsearchConfigs.setRestHighLevelClient(null);
            throw th;
        }
    }

    public void destroy() {
    }

    private void createIndex(String str) throws ConnectionUnavailableException {
        try {
            if (this.elasticsearchConfigs.getRestHighLevelClient().indices().exists(new GetIndexRequest(this.elasticsearchConfigs.getIndexName()), RequestOptions.DEFAULT)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Index: " + this.elasticsearchConfigs.getIndexName() + " has already being created for stream id: " + str + ".");
                    return;
                }
                return;
            }
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.elasticsearchConfigs.getIndexName());
            createIndexRequest.settings(Settings.builder().put("index.number_of_shards", this.elasticsearchConfigs.getNumberOfShards()).put("index.number_of_replicas", this.elasticsearchConfigs.getNumberOfReplicas()));
            if (this.elasticsearchConfigs.getIndexAlias() != null) {
                createIndexRequest.alias(new Alias(this.elasticsearchConfigs.getIndexAlias()));
            }
            try {
                this.elasticsearchConfigs.getRestHighLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
                if (logger.isDebugEnabled()) {
                    logger.debug("A stream id: " + str + " is created with the provided information.");
                }
            } catch (IOException e) {
                throw new ElasticsearchEventSinkException("Error while creating indices for stream id : '" + str, e);
            } catch (ElasticsearchStatusException e2) {
                logger.error("Elasticsearch status exception occurred while creating index for stream id: " + str, (Throwable) e2);
            }
        } catch (IOException e3) {
            throw new ConnectionUnavailableException("Error while checking indices for stream id : '" + str, e3);
        }
    }
}
