package io.siddhi.extension.io.s3.sink;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.event.Event;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.common.utils.S3Constants;
import io.siddhi.extension.io.s3.sink.internal.beans.SinkConfig;
import io.siddhi.extension.io.s3.sink.internal.publisher.EventPublisher;
import io.siddhi.query.api.definition.StreamDefinition;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.client.config.CookieSpecs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Extension(name = "s3", namespace = "sink", description = "S3 sink publishes events as Amazon AWS S3 buckets.", parameters = {@Parameter(name = S3Constants.CREDENTIAL_PROVIDER_CLASS, type = {DataType.STRING}, description = "AWS credential provider class to be used. If blank along with the username and the password, default credential provider will be used.", optional = true, defaultValue = "EMPTY_STRING"), @Parameter(name = S3Constants.AWS_ACCESS_KEY, type = {DataType.STRING}, description = "AWS access key. This cannot be used along with the credential.provider.class", optional = true, defaultValue = "EMPTY_STRING"), @Parameter(name = S3Constants.AWS_SECRET_KEY, type = {DataType.STRING}, description = "AWS secret key. This cannot be used along with the credential.provider.class", optional = true, defaultValue = "EMPTY_STRING"), @Parameter(name = S3Constants.BUCKET_NAME, type = {DataType.STRING}, description = "Name of the S3 bucket"), @Parameter(name = S3Constants.AWS_REGION, type = {DataType.STRING}, description = "The region to be used to create the bucket", optional = true, defaultValue = "EMPTY_STRING"), @Parameter(name = S3Constants.VERSIONING_ENABLED, type = {DataType.BOOL}, description = "Flag to enable versioning support in the bucket", optional = true, defaultValue = "false"), @Parameter(name = S3Constants.OBJECT_PATH, type = {DataType.STRING}, description = "Path for each S3 object", dynamic = true), @Parameter(name = S3Constants.STORAGE_CLASS, type = {DataType.STRING}, description = "AWS storage class", optional = true, defaultValue = CookieSpecs.STANDARD), @Parameter(name = S3Constants.CONTENT_TYPE, type = {DataType.STRING}, description = "Content type of the event", optional = true, defaultValue = "application/octet-stream", dynamic = true), @Parameter(name = S3Constants.BUCKET_ACL, type = {DataType.STRING}, description = "Access control list for the bucket", optional = true, defaultValue = "EMPTY_STRING"), @Parameter(name = S3Constants.NODE_ID, type = {DataType.STRING}, description = "The node ID of the current publisher. This needs to be unique for each publisher instance as it may cause object overwrites while uploading the objects to same S3 bucket from different publishers.", optional = true, defaultValue = "EMPTY_STRING")}, examples = {@Example(syntax = "@sink(type='s3', bucket.name='user-stream-bucket',object.path='bar/users', credential.provider='software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider', flush.size='3',\n    @map(type='json', enclosing.element='$.user', \n        @payload(\"\"\"{\"name\": \"{{name}}\", \"age\": {{age}}}\"\"\"))) \ndefine stream UserStream(name string, age int);  ", description = "This creates a S3 bucket named 'user-stream-bucket'. Then this will collect 3 events together and create a JSON object and save that in S3.")})
/* loaded from: input_file:io/siddhi/extension/io/s3/sink/S3Sink.class */
public class S3Sink extends Sink<SinkState> {
    private static final Logger logger = LogManager.getLogger(S3Sink.class);
    private EventPublisher publisher;
    private SinkConfig config;

    /* loaded from: input_file:io/siddhi/extension/io/s3/sink/S3Sink$SinkState.class */
    public class SinkState extends State {
        private BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue();
        private AtomicInteger eventIncrementer = new AtomicInteger();

        public SinkState() {
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            hashMap.put("taskQueue", this.taskQueue);
            hashMap.put("eventIncrementer", this.eventIncrementer);
            return hashMap;
        }

        public void restore(Map<String, Object> map) {
            this.taskQueue = (BlockingQueue) map.get("taskQueue");
            this.eventIncrementer = (AtomicInteger) map.get("eventIncrementer");
        }

        public BlockingQueue<Runnable> getTaskQueue() {
            return this.taskQueue;
        }

        public AtomicInteger getEventIncrementer() {
            return this.eventIncrementer;
        }
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, Event.class, ByteBuffer.class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{S3Constants.OBJECT_PATH};
    }

    protected StateFactory<SinkState> init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        logger.debug("Initializing the S3 sink connector.");
        SinkState sinkState = new SinkState();
        this.config = new SinkConfig(optionHolder, streamDefinition);
        this.publisher = new EventPublisher(this.config, optionHolder, sinkState);
        return () -> {
            return sinkState;
        };
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, SinkState sinkState) throws ConnectionUnavailableException {
        this.publisher.publish(obj, dynamicOptions, sinkState);
    }

    public void connect() throws ConnectionUnavailableException {
        this.config.setMapType(getMapper().getType());
        this.publisher.init();
        this.publisher.start();
        logger.debug("Event publisher started.");
    }

    public void disconnect() {
        if (this.publisher != null) {
            this.publisher.shutdown();
            logger.debug("Event publisher shutdown.");
        }
    }

    public void destroy() {
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }
}
