/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.flume;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.conf.Configurables;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.flume.AbstractFlumeProcessor;
import org.apache.nifi.processors.flume.NifiSinkSessionChannel;

@TriggerSerially
@Tags(value={"flume", "hadoop", "put", "sink"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Execute a Flume sink. Each input FlowFile is converted into a Flume Event for processing by the sink.")
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.EXECUTE_CODE, explanation="Provides operator the ability to execute arbitrary Flume configurations assuming all permissions that NiFi has.")})
public class ExecuteFlumeSink
extends AbstractFlumeProcessor {
    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder().name("Sink Type").description("The component type name for the sink. For some sinks, this is a short, symbolic name (e.g. hdfs). For others, it's the fully-qualified name of the Sink class. See the Flume User Guide for details.").required(true).addValidator(ExecuteFlumeSink.createSinkValidator()).build();
    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder().name("Agent Name").description("The name of the agent used in the Flume sink configuration").required(true).defaultValue("tier1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder().name("Sink Name").description("The name of the sink used in the Flume sink configuration").required(true).defaultValue("sink-1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder().name("Flume Configuration").description("The Flume configuration for the sink copied from the flume.properties file").required(true).defaultValue("").addValidator(Validator.VALID).build();
    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;
    private volatile Sink sink;
    private volatile NifiSinkSessionChannel channel;

    protected void init(ProcessorInitializationContext context) {
        this.descriptors = ImmutableList.of((Object)SINK_TYPE, (Object)AGENT_NAME, (Object)SOURCE_NAME, (Object)FLUME_CONFIG);
        this.relationships = ImmutableSet.of((Object)SUCCESS, (Object)FAILURE);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        try {
            this.channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
            this.channel.start();
            this.sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(), context.getProperty(SINK_TYPE).getValue());
            this.sink.setChannel((Channel)this.channel);
            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
            String agentName = context.getProperty(AGENT_NAME).getValue();
            String sinkName = context.getProperty(SOURCE_NAME).getValue();
            Configurables.configure((Object)this.sink, (Context)ExecuteFlumeSink.getFlumeSinkContext(flumeConfig, agentName, sinkName));
            this.sink.start();
        }
        catch (Throwable th) {
            this.getLogger().error("Error creating sink", th);
            throw Throwables.propagate((Throwable)th);
        }
    }

    @OnStopped
    public void stopped() {
        this.sink.stop();
        this.channel.stop();
    }

    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        this.channel.setSession(session);
        try {
            this.sink.process();
        }
        catch (EventDeliveryException ex) {
            throw new ProcessException("Flume event delivery failed", (Throwable)ex);
        }
    }
}

