/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.file;

import com.google.common.base.Stopwatch;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.file.metrics.SinkMetrics;
import io.siddhi.extension.io.file.metrics.StreamStatus;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.StreamDefinition;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.BinaryCarbonMessage;
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.file.connector.sender.VFSClientConnector;

@Extension(name="file", namespace="sink", description="The File Sink component of the 'siddhi-io-fie' extension publishes (writes) event data that is processed within Siddhi to files. \nSiddhi-io-file sink provides support to write both textual and binary data into files\n", parameters={@Parameter(name="file.uri", description="The path to thee file in which the data needs to be published. ", type={DataType.STRING}, dynamic=true), @Parameter(name="append", description="This specifies whether the data should be appended to the file or not.\nIf this parameter is set to 'true', data is written at the end of the file without changing the existing content.\n If the parameter is set to 'false', the existing content of the file is deleted and the content you are publishing is added to replace it.\nIf the file does not exist, a new file is created and then the data is written in it. In such a scenario, the value specified for this parameter is not applicable\n", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="add.line.separator", description="If this parameter is set to 'true', events added to the file are separated by adding each event in a new line.\n", type={DataType.BOOL}, optional=true, defaultValue="true. (However, if the 'csv' mapper is used, it is false)"), @Parameter(name="file.system.options", description="The file options in key:value pairs separated by commas. \neg:'USER_DIR_IS_ROOT:false,PASSIVE_MODE:true,AVOID_PERMISSION_CHECK:true,IDENTITY:file://demo/.ssh/id_rsa,IDENTITY_PASS_PHRASE:wso2carbon'\nNote: when IDENTITY is used, use a RSA PRIVATE KEY", type={DataType.STRING}, optional=true, defaultValue="<Empty_String>")}, examples={@Example(syntax="@sink(type='file', @map(type='json'), append='false', file.uri='/abc/{{symbol}}.txt') define stream BarStream (symbol string, price float, volume long); ", description="In the above configuration, each output event is published in the '/abc/{{symbol}}.txt' file in JSON format.The output looks as follows:\n{\n    \"event\":{\n        \"symbol\":\"WSO2\",\n        \"price\":55.6,\n        \"volume\":100\n    }\n}\nIf the file does not exist at the time an output event is generated, the system creates the file and proceeds to publish the output event in it.")})
public class FileSink
extends Sink {
    private static final Logger log = Logger.getLogger(FileSink.class);
    private VFSClientConnector vfsClientConnector = null;
    private Map<String, String> properties = null;
    private Option uriOption;
    private SiddhiAppContext siddhiAppContext;
    private boolean addEventSeparator;
    private String siddhiAppName;
    private SinkMetrics metrics;
    private String fileSystemOptions;

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, byte[].class, Object.class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{"file.uri"};
    }

    protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppContext = siddhiAppContext;
        this.siddhiAppName = siddhiAppContext.getName();
        this.uriOption = optionHolder.validateAndGetOption("file.uri");
        String append = optionHolder.validateAndGetStaticValue("append", "true");
        this.properties = new HashMap<String, String>();
        this.properties.put("action", "write");
        if ("true".equalsIgnoreCase(append)) {
            this.properties.put("append", append);
        }
        String mapType = ((Element)((Annotation)((Annotation)streamDefinition.getAnnotations().get(0)).getAnnotations().get(0)).getElements().get(0)).getValue();
        this.addEventSeparator = optionHolder.isOptionExists("add.line.separator") ? Boolean.parseBoolean(optionHolder.validateAndGetStaticValue("add.line.separator")) : !mapType.equalsIgnoreCase("csv");
        this.fileSystemOptions = optionHolder.validateAndGetStaticValue("file.system.options", null);
        mapType = Utils.capitalizeFirstLetter(mapType);
        if (MetricsDataHolder.getInstance().getMetricService() != null && MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
            try {
                if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning("prometheus")) {
                    this.metrics = new SinkMetrics(siddhiAppContext.getName(), mapType, streamDefinition.getId());
                }
            }
            catch (IllegalArgumentException e) {
                log.debug((Object)"Prometheus reporter is not running. Hence file metrics will not be initialized.");
            }
        }
        return null;
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public void connect() throws ConnectionUnavailableException {
        this.vfsClientConnector = new VFSClientConnector();
        Map<String, Object> schemeFileOptions = Utils.getFileSystemOptionObjectMap(this.uriOption.getValue(), this.fileSystemOptions);
        try {
            this.vfsClientConnector.init(null, null, schemeFileOptions);
        }
        catch (ClientConnectorException e) {
            throw new ConnectionUnavailableException("Exception occured when initializing VFSClientConnector", (Throwable)e);
        }
        if (this.metrics != null) {
            this.metrics.updateMetrics(this.siddhiAppContext.getExecutorService());
        }
    }

    public void disconnect() {
    }

    public void destroy() {
    }

    public void publish(Object payload, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        String uri;
        boolean canBeWritten;
        byte[] byteArray;
        block16: {
            byteArray = new byte[]{};
            canBeWritten = true;
            uri = this.uriOption.getValue(dynamicOptions);
            if (this.metrics != null) {
                this.metrics.setFilePath(uri);
            }
            if (payload instanceof byte[]) {
                byteArray = (byte[])payload;
            } else {
                try {
                    StringBuilder sb = new StringBuilder();
                    sb.append(payload.toString());
                    if (this.addEventSeparator) {
                        sb.append("\n");
                    }
                    byteArray = sb.toString().getBytes("UTF-8");
                }
                catch (UnsupportedEncodingException e) {
                    canBeWritten = false;
                    log.error((Object)"Received payload does not support UTF-8 encoding. Hence dropping the event.", (Throwable)e);
                    if (this.metrics == null) break block16;
                    this.metrics.getSinkDroppedEvents().inc();
                }
            }
        }
        if (canBeWritten) {
            BinaryCarbonMessage binaryCarbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(byteArray), true);
            this.properties.put("uri", uri);
            int byteSize = byteArray.length;
            try {
                boolean send = this.vfsClientConnector.send(binaryCarbonMessage, null, this.properties);
                if (this.metrics == null) {
                    return;
                }
                if (send) {
                    String shortenFilePath = Utils.getShortFilePath(uri);
                    boolean added = this.metrics.getFilesURI().add(shortenFilePath);
                    if (this.metrics.getSinkFileLastPublishedTimeMap().containsKey(shortenFilePath)) {
                        this.metrics.getSinkFileLastPublishedTimeMap().replace(shortenFilePath, System.currentTimeMillis());
                        this.metrics.getSinkFileStatusMap().replace(shortenFilePath, StreamStatus.PROCESSING);
                    } else {
                        this.metrics.getSinkFileLastPublishedTimeMap().put(shortenFilePath, System.currentTimeMillis());
                        this.metrics.getSinkFileStatusMap().put(shortenFilePath, StreamStatus.PROCESSING);
                    }
                    this.metrics.getSinkLinesCount().inc();
                    if (added) {
                        this.metrics.getSinkFileSize().inc(Utils.getFileSize(uri));
                        this.metrics.getSinkElapsedTimeMap().put(shortenFilePath, Stopwatch.createStarted());
                        this.metrics.setSinkLastPublishedTime();
                        this.metrics.setSinkElapsedTime(shortenFilePath);
                        this.metrics.setSinkFileStatusMetrics();
                    } else {
                        this.metrics.getSinkFileSize().inc((long)byteSize);
                    }
                    this.metrics.getTotalWriteMetrics().inc();
                    this.metrics.getSinkFilesEventCount().inc();
                    this.metrics.getSinkDroppedEvents();
                    this.metrics.getErrorCount();
                    this.metrics.getWriteBytes().inc((long)byteSize);
                }
            }
            catch (ClientConnectorException e) {
                if (this.metrics != null) {
                    this.metrics.getSinkFileStatusMap().replace(Utils.getShortFilePath(uri), StreamStatus.ERROR);
                    this.metrics.getErrorCount().inc();
                }
                throw new ConnectionUnavailableException("Writing data into the file " + uri + " failed during the execution of '" + this.siddhiAppName + "' SiddhiApp, due to " + e.getMessage(), (Throwable)e);
            }
        }
    }
}

