/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.record.sink.event;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;

@Tags(value={"UDP", "event", "record", "sink"})
@CapabilityDescription(value="Format and send Records as UDP Datagram Packets to a configurable destination")
public class UDPEventRecordSink
extends AbstractControllerService
implements RecordSinkService {
    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder().name("hostname").displayName("Hostname").description("Destination hostname or IP address").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("port").displayName("Port").description("Destination port number").required(true).addValidator(StandardValidators.PORT_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    public static final PropertyDescriptor SENDER_THREADS = new PropertyDescriptor.Builder().name("sender-threads").displayName("Sender Threads").description("Number of worker threads allocated for handling socket communication").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).defaultValue("2").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(HOSTNAME, PORT, RECORD_WRITER_FACTORY, SENDER_THREADS));
    private static final String TRANSIT_URI_ATTRIBUTE_KEY = "record.sink.url";
    private static final String TRANSIT_URI_FORMAT = "udp://%s:%d";
    private RecordSetWriterFactory writerFactory;
    private EventSender<byte[]> eventSender;
    private String transitUri;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) {
        this.writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
        this.eventSender = this.getEventSender(context);
    }

    @OnDisabled
    public void onDisabled() throws Exception {
        if (this.eventSender == null) {
            this.getLogger().debug("Event Sender not configured");
        } else {
            this.eventSender.close();
        }
    }

    public WriteResult sendData(RecordSet recordSet, Map<String, String> attributes, boolean sendZeroResults) throws IOException {
        LinkedHashMap<String, String> writeAttributes = new LinkedHashMap<String, String>(attributes);
        writeAttributes.put(TRANSIT_URI_ATTRIBUTE_KEY, this.transitUri);
        int recordCount = 0;
        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
             RecordSetWriter writer = this.writerFactory.createWriter(this.getLogger(), recordSet.getSchema(), (OutputStream)outputStream, writeAttributes);){
            Record record;
            while ((record = recordSet.next()) != null) {
                WriteResult writeResult = writer.write(record);
                writer.flush();
                this.sendRecord(outputStream);
                recordCount += writeResult.getRecordCount();
            }
        }
        catch (SchemaNotFoundException e) {
            throw new IOException("Record Schema not found", e);
        }
        catch (IOException | RuntimeException e) {
            throw new IOException(String.format("Record [%d] Destination [%s] Transmission failed", recordCount, this.transitUri), e);
        }
        return WriteResult.of((int)recordCount, writeAttributes);
    }

    private void sendRecord(ByteArrayOutputStream outputStream) {
        byte[] bytes = outputStream.toByteArray();
        this.eventSender.sendEvent((Object)bytes);
        outputStream.reset();
    }

    private EventSender<byte[]> getEventSender(ConfigurationContext context) {
        String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
        int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
        this.transitUri = String.format(TRANSIT_URI_FORMAT, hostname, port);
        ByteArrayNettyEventSenderFactory factory = new ByteArrayNettyEventSenderFactory(this.getLogger(), hostname, port, TransportProtocol.UDP);
        factory.setShutdownQuietPeriod(Duration.ZERO);
        factory.setShutdownTimeout(Duration.ZERO);
        factory.setThreadNamePrefix(String.format("%s[%s]", ((Object)((Object)this)).getClass().getSimpleName(), this.getIdentifier()));
        int senderThreads = context.getProperty(SENDER_THREADS).evaluateAttributeExpressions().asInteger();
        factory.setWorkerThreads(senderThreads);
        return factory.getEventSender();
    }
}

