/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.flume;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flume.Channel;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.PollableSource;
import org.apache.flume.Source;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.EventDrivenSourceRunner;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.flume.AbstractFlumeProcessor;
import org.apache.nifi.processors.flume.NifiChannelSelector;
import org.apache.nifi.processors.flume.NifiSessionChannel;
import org.apache.nifi.processors.flume.NifiSessionFactoryChannel;

@TriggerSerially
@Tags(value={"flume", "hadoop", "get", "source"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription(value="Execute a Flume source. Each Flume Event is sent to the success relationship as a FlowFile")
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.EXECUTE_CODE, explanation="Provides operator the ability to execute arbitrary Flume configurations assuming all permissions that NiFi has.")})
public class ExecuteFlumeSource
extends AbstractFlumeProcessor {
    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder().name("Source Type").description("The component type name for the source. For some sources, this is a short, symbolic name (e.g. spooldir). For others, it's the fully-qualified name of the Source class. See the Flume User Guide for details.").required(true).addValidator(ExecuteFlumeSource.createSourceValidator()).build();
    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder().name("Agent Name").description("The name of the agent used in the Flume source configuration").required(true).defaultValue("tier1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder().name("Source Name").description("The name of the source used in the Flume source configuration").required(true).defaultValue("src-1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder().name("Flume Configuration").description("The Flume configuration for the source copied from the flume.properties file").required(true).defaultValue("").addValidator(Validator.VALID).build();
    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;
    private volatile Source source;
    private final NifiSessionChannel pollableSourceChannel = new NifiSessionChannel(SUCCESS);
    private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<Object>(null);
    private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<Object>(null);
    private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<Object>(null);

    protected void init(ProcessorInitializationContext context) {
        this.descriptors = ImmutableList.of((Object)SOURCE_TYPE, (Object)AGENT_NAME, (Object)SOURCE_NAME, (Object)FLUME_CONFIG);
        this.relationships = ImmutableSet.of((Object)SUCCESS);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        try {
            this.source = SOURCE_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(), context.getProperty(SOURCE_TYPE).getValue());
            String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
            String agentName = context.getProperty(AGENT_NAME).getValue();
            String sourceName = context.getProperty(SOURCE_NAME).getValue();
            Configurables.configure((Object)this.source, (Context)ExecuteFlumeSource.getFlumeSourceContext(flumeConfig, agentName, sourceName));
            if (this.source instanceof PollableSource) {
                this.source.setChannelProcessor(new ChannelProcessor((ChannelSelector)new NifiChannelSelector((Channel)this.pollableSourceChannel)));
                this.source.start();
            }
        }
        catch (Throwable th) {
            this.getLogger().error("Error creating source", th);
            throw Throwables.propagate((Throwable)th);
        }
    }

    @OnStopped
    public void stopped() {
        if (this.source instanceof PollableSource) {
            this.source.stop();
        } else {
            NifiSessionFactoryChannel eventDrivenSourceChannel;
            EventDrivenSourceRunner runner = this.runnerRef.get();
            if (runner != null) {
                runner.stop();
                this.runnerRef.compareAndSet(runner, null);
            }
            if ((eventDrivenSourceChannel = this.eventDrivenSourceChannelRef.get()) != null) {
                eventDrivenSourceChannel.stop();
                this.eventDrivenSourceChannelRef.compareAndSet(eventDrivenSourceChannel, null);
            }
        }
        this.sessionFactoryRef.set(null);
    }

    @Override
    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSessionFactory old;
        if (this.source instanceof PollableSource) {
            super.onTrigger(context, sessionFactory);
        } else if (this.source instanceof EventDrivenSource && (old = this.sessionFactoryRef.getAndSet(sessionFactory)) != sessionFactory) {
            if (this.runnerRef.get() != null) {
                this.stopped();
                this.sessionFactoryRef.set(sessionFactory);
            }
            this.runnerRef.set(new EventDrivenSourceRunner());
            this.eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(this.sessionFactoryRef.get(), SUCCESS));
            this.eventDrivenSourceChannelRef.get().start();
            this.source.setChannelProcessor(new ChannelProcessor((ChannelSelector)new NifiChannelSelector((Channel)this.eventDrivenSourceChannelRef.get())));
            this.runnerRef.get().setSource(this.source);
            this.runnerRef.get().start();
        }
    }

    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        if (this.source instanceof PollableSource) {
            PollableSource pollableSource = (PollableSource)this.source;
            try {
                this.pollableSourceChannel.setSession(session);
                pollableSource.process();
            }
            catch (EventDeliveryException ex) {
                throw new ProcessException("Error processing pollable source", (Throwable)ex);
            }
        } else {
            throw new ProcessException("Invalid source type: " + this.source.getClass().getSimpleName());
        }
    }
}

