/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.record.sink;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;

@Tags(value={"record", "sink", "log"})
@CapabilityDescription(value="Provides a RecordSinkService that can be used to log records to the application log (nifi-app.log, e.g.) using the specified writer for formatting.")
public class LoggingRecordSink
extends AbstractControllerService
implements RecordSinkService {
    private List<PropertyDescriptor> properties;
    private volatile RecordSetWriterFactory writerFactory;
    private volatile LogLevel logLevel;
    public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder().name("logsink-log-level").displayName("Log Level").required(true).description("The Log Level at which to log records (INFO, DEBUG, e.g.)").allowableValues((Enum[])LogLevel.values()).defaultValue(LogLevel.INFO.name()).build();

    protected void init(ControllerServiceInitializationContext context) throws InitializationException {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RecordSinkService.RECORD_WRITER_FACTORY);
        properties.add(LOG_LEVEL);
        this.properties = Collections.unmodifiableList(properties);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) throws InitializationException {
        this.writerFactory = (RecordSetWriterFactory)context.getProperty(RecordSinkService.RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
        this.logLevel = LogLevel.valueOf((String)context.getProperty(LOG_LEVEL).getValue());
    }

    public WriteResult sendData(RecordSet recordSet, Map<String, String> attributes, boolean sendZeroResults) throws IOException {
        WriteResult writeResult;
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            ComponentLog log = this.getLogger();
            try (RecordSetWriter writer = this.writerFactory.createWriter(this.getLogger(), recordSet.getSchema(), (OutputStream)baos, attributes);){
                Record r;
                writer.beginRecordSet();
                while ((r = recordSet.next()) != null) {
                    baos.reset();
                    writer.write(r);
                    writer.flush();
                    log.log(this.logLevel, "{}", new Object[]{baos});
                }
                writeResult = writer.finishRecordSet();
                writer.flush();
            }
        }
        catch (SchemaNotFoundException e) {
            throw new IOException(e);
        }
        return writeResult;
    }
}

