/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.loading.LoadDistributionListener;
import org.apache.nifi.loading.LoadDistributionService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;

@EventDriven
@SideEffectFree
@SupportsBatching
@TriggerWhenAnyDestinationAvailable
@Tags(value={"distribute", "load balance", "route", "round robin", "weighted"})
@CapabilityDescription(value="Distributes FlowFiles to downstream processors based on a Distribution Strategy. If using the Round Robin strategy, the default is to assign each destination a weighting of 1 (evenly distributed). However, optional propertiescan be added to the change this; adding a property with the name '5' and value '10' means that the relationship with name '5' will be receive 10 FlowFiles in each iteration instead of 1.")
public class DistributeLoad
extends AbstractProcessor {
    public static final String STRATEGY_ROUND_ROBIN = "round robin";
    public static final String STRATEGY_NEXT_AVAILABLE = "next available";
    public static final String STRATEGY_LOAD_DISTRIBUTION_SERVICE = "load distribution service";
    public static final PropertyDescriptor NUM_RELATIONSHIPS = new PropertyDescriptor.Builder().name("Number of Relationships").description("Determines the number of Relationships to which the load should be distributed").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("1").build();
    public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new PropertyDescriptor.Builder().name("Distribution Strategy").description("Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 destination can accept FlowFiles.").required(true).allowableValues(new String[]{"round robin", "next available", "load distribution service"}).defaultValue("round robin").build();
    public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder().name("Hostnames").description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter").required(true).addValidator(new Validator(){

        public ValidationResult validate(String subject, String input, ValidationContext context) {
            String[] hostNames;
            ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build();
            if (null == input) {
                result = new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Need to specify delimited list of FQDNs").build();
                return result;
            }
            for (String hostName : hostNames = input.split("(?:,+|;+|\\s+)")) {
                if (!StringUtils.isNotBlank((CharSequence)hostName) || hostName.contains(".")) continue;
                result = new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Need a FQDN rather than a simple host name.").build();
                return result;
            }
            return result;
        }
    }).build();
    public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder().name("Load Distribution Service ID").description("The identifier of the Load Distribution Service").required(true).identifiesControllerService(LoadDistributionService.class).build();
    private List<PropertyDescriptor> properties;
    private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference();
    private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<RoundRobinStrategy>(new RoundRobinStrategy());
    private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference();
    private final AtomicBoolean doCustomValidate = new AtomicBoolean(false);
    private volatile LoadDistributionListener myListener;
    private final AtomicBoolean doSetProps = new AtomicBoolean(true);

    protected void init(ProcessorInitializationContext context) {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(DistributeLoad.createRelationship(1));
        this.relationshipsRef.set(Collections.unmodifiableSet(relationships));
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(NUM_RELATIONSHIPS);
        properties.add(DISTRIBUTION_STRATEGY);
        this.properties = Collections.unmodifiableList(properties);
    }

    private static Relationship createRelationship(int num) {
        return new Relationship.Builder().name(String.valueOf(num)).build();
    }

    public Set<Relationship> getRelationships() {
        return this.relationshipsRef.get();
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (descriptor.equals((Object)NUM_RELATIONSHIPS)) {
            HashSet<Relationship> relationships = new HashSet<Relationship>();
            for (int i = 1; i <= Integer.parseInt(newValue); ++i) {
                relationships.add(DistributeLoad.createRelationship(i));
            }
            this.relationshipsRef.set(Collections.unmodifiableSet(relationships));
        } else if (descriptor.equals((Object)DISTRIBUTION_STRATEGY)) {
            switch (newValue.toLowerCase()) {
                case "round robin": {
                    this.strategyRef.set(new RoundRobinStrategy());
                    break;
                }
                case "next available": {
                    this.strategyRef.set(new NextAvailableStrategy());
                    break;
                }
                case "load distribution service": {
                    this.strategyRef.set(new LoadDistributionStrategy());
                }
            }
            this.doSetProps.set(true);
            this.doCustomValidate.set(true);
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        if (this.strategyRef.get() instanceof LoadDistributionStrategy && this.doSetProps.getAndSet(false)) {
            ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>(this.properties);
            props.add(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
            props.add(HOSTNAMES);
            this.properties = Collections.unmodifiableList(props);
        } else if (this.doSetProps.getAndSet(false)) {
            ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
            props.add(NUM_RELATIONSHIPS);
            props.add(DISTRIBUTION_STRATEGY);
            this.properties = Collections.unmodifiableList(props);
        }
        return this.properties;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        int numRelationships = this.relationshipsRef.get().size();
        try {
            int value = Integer.parseInt(propertyDescriptorName);
            if (value <= 0 || value > numRelationships) {
                return new PropertyDescriptor.Builder().addValidator((Validator)new InvalidPropertyNameValidator(propertyDescriptorName)).name(propertyDescriptorName).build();
            }
        }
        catch (NumberFormatException e) {
            return new PropertyDescriptor.Builder().addValidator((Validator)new InvalidPropertyNameValidator(propertyDescriptorName)).name(propertyDescriptorName).build();
        }
        return new PropertyDescriptor.Builder().addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).name(propertyDescriptorName).dynamic(true).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        String distStrat;
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        if (this.doCustomValidate.getAndSet(false) && (distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue()).equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
            PropertyValue propDesc = validationContext.getProperty(HOSTNAMES);
            if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
                results.add(new ValidationResult.Builder().subject(HOSTNAMES.getName()).explanation("Must specify Hostnames when using 'Load Distribution Strategy'").valid(false).build());
            }
            if (null == (propDesc = validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE)) || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
                results.add(new ValidationResult.Builder().subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName()).explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Strategy'").valid(false).build());
            }
            if (results.isEmpty()) {
                int numRels = validationContext.getProperty(NUM_RELATIONSHIPS).asInteger();
                String hostNamesValue = validationContext.getProperty(HOSTNAMES).getValue();
                String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
                int numHosts = 0;
                for (String hostName : hostNames) {
                    if (!StringUtils.isNotBlank((CharSequence)hostName)) continue;
                    hostNames[numHosts++] = hostName;
                }
                if (numHosts > numRels) {
                    results.add(new ValidationResult.Builder().subject("Number of Relationships and Hostnames").explanation("Number of Relationships must be equal to, or greater than, the number of host names").valid(false).build());
                } else {
                    int i;
                    TreeSet<Relationship> relsWithDesc = new TreeSet<Relationship>();
                    for (i = 0; i < numHosts; ++i) {
                        relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1)).description(hostNames[i]).build());
                    }
                    for (i = numHosts + 1; i <= numRels; ++i) {
                        relsWithDesc.add(DistributeLoad.createRelationship(i));
                    }
                    this.relationshipsRef.set(Collections.unmodifiableSet(relsWithDesc));
                }
            }
        }
        return results;
    }

    @OnScheduled
    public void createWeightedList(ProcessContext context) {
        final LinkedHashMap<Integer, Integer> weightings = new LinkedHashMap<Integer, Integer>();
        String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue();
        if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
            String hostNamesValue = context.getProperty(HOSTNAMES).getValue();
            String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
            HashSet<String> hostNameSet = new HashSet<String>();
            for (String hostName : hostNames) {
                if (!StringUtils.isNotBlank((CharSequence)hostName)) continue;
                hostNameSet.add(hostName);
            }
            LoadDistributionService svc = (LoadDistributionService)context.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE).asControllerService(LoadDistributionService.class);
            this.myListener = new LoadDistributionListener(){

                public void update(Map<String, Integer> loadInfo) {
                    for (Relationship rel : (Set)DistributeLoad.this.relationshipsRef.get()) {
                        String hostname = rel.getDescription();
                        Integer weight = 1;
                        if (loadInfo.containsKey(hostname)) {
                            weight = loadInfo.get(hostname);
                        }
                        weightings.put(Integer.decode(rel.getName()), weight);
                    }
                    DistributeLoad.this.updateWeightedRelationships(weightings);
                }
            };
            Map loadInfo = svc.getLoadDistribution(hostNameSet, this.myListener);
            for (Relationship rel : this.relationshipsRef.get()) {
                String hostname = rel.getDescription();
                Integer weight = 1;
                if (loadInfo.containsKey(hostname)) {
                    weight = (Integer)loadInfo.get(hostname);
                }
                weightings.put(Integer.decode(rel.getName()), weight);
            }
        } else {
            int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
            for (int i = 1; i <= numRelationships; ++i) {
                weightings.put(i, 1);
            }
            for (PropertyDescriptor propDesc : context.getProperties().keySet()) {
                if (this.properties.contains(propDesc)) continue;
                int relationship = Integer.parseInt(propDesc.getName());
                int weighting = context.getProperty(propDesc).asInteger();
                weightings.put(relationship, weighting);
            }
        }
        this.updateWeightedRelationships(weightings);
    }

    private void updateWeightedRelationships(Map<Integer, Integer> weightings) {
        ArrayList<Relationship> relationshipList = new ArrayList<Relationship>();
        for (Map.Entry<Integer, Integer> entry : weightings.entrySet()) {
            String relationshipName = String.valueOf(entry.getKey());
            Relationship relationship = new Relationship.Builder().name(relationshipName).build();
            for (int i = 0; i < entry.getValue(); ++i) {
                relationshipList.add(relationship);
            }
        }
        this.weightedRelationshipListRef.set(Collections.unmodifiableList(relationshipList));
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        boolean allDestinationsAvailable;
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        DistributionStrategy strategy = this.strategyRef.get();
        Set available = context.getAvailableRelationships();
        int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
        boolean bl = allDestinationsAvailable = available.size() == numRelationships;
        if (!allDestinationsAvailable && strategy.requiresAllDestinationsAvailable()) {
            return;
        }
        Relationship relationship = strategy.mapToRelationship(context, flowFile);
        if (relationship == null) {
            session.rollback();
            context.yield();
            return;
        }
        session.transfer(flowFile, relationship);
        session.getProvenanceReporter().route(flowFile, relationship);
    }

    private class NextAvailableStrategy
    implements DistributionStrategy {
        private final AtomicLong counter = new AtomicLong(0L);

        private NextAvailableStrategy() {
        }

        @Override
        public Relationship mapToRelationship(ProcessContext context, FlowFile flowFile) {
            List relationshipList = (List)DistributeLoad.this.weightedRelationshipListRef.get();
            int numRelationships = relationshipList.size();
            boolean foundFreeRelationship = false;
            Relationship relationship = null;
            int attempts = 0;
            while (!foundFreeRelationship) {
                long counterValue = this.counter.getAndIncrement();
                int idx = (int)(counterValue % (long)numRelationships);
                relationship = (Relationship)relationshipList.get(idx);
                foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
                if (++attempts % numRelationships != 0 || foundFreeRelationship) continue;
                return null;
            }
            return relationship;
        }

        @Override
        public boolean requiresAllDestinationsAvailable() {
            return false;
        }
    }

    private class RoundRobinStrategy
    implements DistributionStrategy {
        private final AtomicLong counter = new AtomicLong(0L);

        private RoundRobinStrategy() {
        }

        @Override
        public Relationship mapToRelationship(ProcessContext context, FlowFile flowFile) {
            List relationshipList = (List)DistributeLoad.this.weightedRelationshipListRef.get();
            long counterValue = this.counter.getAndIncrement();
            int idx = (int)(counterValue % (long)relationshipList.size());
            Relationship relationship = (Relationship)relationshipList.get(idx);
            return relationship;
        }

        @Override
        public boolean requiresAllDestinationsAvailable() {
            return true;
        }
    }

    private class LoadDistributionStrategy
    implements DistributionStrategy {
        private final AtomicLong counter = new AtomicLong(0L);

        private LoadDistributionStrategy() {
        }

        @Override
        public Relationship mapToRelationship(ProcessContext context, FlowFile flowFile) {
            List relationshipList = (List)DistributeLoad.this.weightedRelationshipListRef.get();
            int numRelationships = relationshipList.size();
            boolean foundFreeRelationship = false;
            Relationship relationship = null;
            int attempts = 0;
            while (!foundFreeRelationship) {
                long counterValue = this.counter.getAndIncrement();
                int idx = (int)(counterValue % (long)numRelationships);
                relationship = (Relationship)relationshipList.get(idx);
                foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
                if (++attempts % numRelationships != 0 || foundFreeRelationship) continue;
                return null;
            }
            return relationship;
        }

        @Override
        public boolean requiresAllDestinationsAvailable() {
            return false;
        }
    }

    private static interface DistributionStrategy {
        public Relationship mapToRelationship(ProcessContext var1, FlowFile var2);

        public boolean requiresAllDestinationsAvailable();
    }

    private static class InvalidPropertyNameValidator
    implements Validator {
        private final String propertyName;

        public InvalidPropertyNameValidator(String propertyName) {
            this.propertyName = propertyName;
        }

        public ValidationResult validate(String subject, String input, ValidationContext validationContext) {
            return new ValidationResult.Builder().subject("Property Name").input(this.propertyName).explanation("Property Name must be a positive integer between 1 and the number of relationships (inclusive)").valid(false).build();
        }
    }
}

