package org.apache.flume.node;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.flume.Channel;
import org.apache.flume.ChannelFactory;
import org.apache.flume.Context;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.SinkFactory;
import org.apache.flume.SinkRunner;
import org.apache.flume.Source;
import org.apache.flume.SourceFactory;
import org.apache.flume.SourceRunner;
import org.apache.flume.annotations.Disposable;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.ChannelSelectorFactory;
import org.apache.flume.channel.DefaultChannelFactory;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.conf.Configurables;
import org.apache.flume.conf.FlumeConfiguration;
import org.apache.flume.conf.TransactionCapacitySupported;
import org.apache.flume.conf.sink.SinkConfiguration;
import org.apache.flume.conf.sink.SinkGroupConfiguration;
import org.apache.flume.conf.source.SourceConfiguration;
import org.apache.flume.sink.DefaultSinkFactory;
import org.apache.flume.sink.DefaultSinkProcessor;
import org.apache.flume.sink.SinkGroup;
import org.apache.flume.source.DefaultSourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/node/AbstractConfigurationProvider.class */
public abstract class AbstractConfigurationProvider implements ConfigurationProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConfigurationProvider.class);
    private final String agentName;
    private final SourceFactory sourceFactory = new DefaultSourceFactory();
    private final SinkFactory sinkFactory = new DefaultSinkFactory();
    private final ChannelFactory channelFactory = new DefaultChannelFactory();
    private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/node/AbstractConfigurationProvider$ChannelComponent.class */
    public static class ChannelComponent {
        final Channel channel;
        final List<String> components = Lists.newArrayList();

        ChannelComponent(Channel channel) {
            this.channel = channel;
        }
    }

    public AbstractConfigurationProvider(String str) {
        this.agentName = str;
    }

    protected abstract FlumeConfiguration getFlumeConfiguration();

    @Override // org.apache.flume.node.ConfigurationProvider
    public MaterializedConfiguration getConfiguration() {
        SimpleMaterializedConfiguration simpleMaterializedConfiguration = new SimpleMaterializedConfiguration();
        FlumeConfiguration.AgentConfiguration configurationFor = getFlumeConfiguration().getConfigurationFor(getAgentName());
        if (configurationFor != null) {
            HashMap newHashMap = Maps.newHashMap();
            HashMap newHashMap2 = Maps.newHashMap();
            HashMap newHashMap3 = Maps.newHashMap();
            try {
                try {
                    loadChannels(configurationFor, newHashMap);
                    loadSources(configurationFor, newHashMap, newHashMap2);
                    loadSinks(configurationFor, newHashMap, newHashMap3);
                    for (String str : new HashSet(newHashMap.keySet())) {
                        ChannelComponent channelComponent = newHashMap.get(str);
                        if (channelComponent.components.isEmpty()) {
                            LOGGER.warn(String.format("Channel %s has no components connected and has been removed.", str));
                            newHashMap.remove(str);
                            Map<String, Channel> map = this.channelCache.get(channelComponent.channel.getClass());
                            if (map != null) {
                                map.remove(str);
                            }
                        } else {
                            LOGGER.info(String.format("Channel %s connected to %s", str, channelComponent.components.toString()));
                            simpleMaterializedConfiguration.addChannel(str, channelComponent.channel);
                        }
                    }
                    for (Map.Entry<String, SourceRunner> entry : newHashMap2.entrySet()) {
                        simpleMaterializedConfiguration.addSourceRunner(entry.getKey(), entry.getValue());
                    }
                    for (Map.Entry<String, SinkRunner> entry2 : newHashMap3.entrySet()) {
                        simpleMaterializedConfiguration.addSinkRunner(entry2.getKey(), entry2.getValue());
                    }
                } catch (InstantiationException e) {
                    LOGGER.error("Failed to instantiate component", e);
                    newHashMap.clear();
                    newHashMap2.clear();
                    newHashMap3.clear();
                }
            } finally {
                newHashMap.clear();
                newHashMap2.clear();
                newHashMap3.clear();
            }
        } else {
            LOGGER.warn("No configuration found for this host:{}", getAgentName());
        }
        return simpleMaterializedConfiguration;
    }

    public String getAgentName() {
        return this.agentName;
    }

    private void loadChannels(FlumeConfiguration.AgentConfiguration agentConfiguration, Map<String, ChannelComponent> map) throws InstantiationException {
        LOGGER.info("Creating channels");
        ArrayListMultimap create = ArrayListMultimap.create();
        for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : this.channelCache.entrySet()) {
            create.get(entry.getKey()).addAll(entry.getValue().keySet());
        }
        Set<String> channelSet = agentConfiguration.getChannelSet();
        Map channelConfigMap = agentConfiguration.getChannelConfigMap();
        for (String str : channelSet) {
            ComponentConfiguration componentConfiguration = (ComponentConfiguration) channelConfigMap.get(str);
            if (componentConfiguration != null) {
                Channel orCreateChannel = getOrCreateChannel(create, componentConfiguration.getComponentName(), componentConfiguration.getType());
                try {
                    Configurables.configure(orCreateChannel, componentConfiguration);
                    map.put(componentConfiguration.getComponentName(), new ChannelComponent(orCreateChannel));
                    LOGGER.info("Created channel " + str);
                } catch (Exception e) {
                    LOGGER.error(String.format("Channel %s has been removed due to an error during configuration", str), e);
                }
            }
        }
        for (String str2 : channelSet) {
            Context context = (Context) agentConfiguration.getChannelContext().get(str2);
            if (context != null) {
                Channel orCreateChannel2 = getOrCreateChannel(create, str2, context.getString("type"));
                try {
                    Configurables.configure(orCreateChannel2, context);
                    map.put(str2, new ChannelComponent(orCreateChannel2));
                    LOGGER.info("Created channel " + str2);
                } catch (Exception e2) {
                    LOGGER.error(String.format("Channel %s has been removed due to an error during configuration", str2), e2);
                }
            }
        }
        for (Class cls : create.keySet()) {
            Map<String, Channel> map2 = this.channelCache.get(cls);
            if (map2 != null) {
                for (String str3 : create.get(cls)) {
                    if (map2.remove(str3) != null) {
                        LOGGER.info("Removed {} of type {}", str3, cls);
                    }
                }
                if (map2.isEmpty()) {
                    this.channelCache.remove(cls);
                }
            }
        }
    }

    private Channel getOrCreateChannel(ListMultimap<Class<? extends Channel>, String> listMultimap, String str, String str2) throws FlumeException {
        Class<? extends Channel> cls = this.channelFactory.getClass(str2);
        if (cls.isAnnotationPresent(Disposable.class)) {
            Channel create = this.channelFactory.create(str, str2);
            create.setName(str);
            return create;
        }
        Map<String, Channel> map = this.channelCache.get(cls);
        if (map == null) {
            map = new HashMap();
            this.channelCache.put(cls, map);
        }
        Channel channel = map.get(str);
        if (channel == null) {
            channel = this.channelFactory.create(str, str2);
            channel.setName(str);
            map.put(str, channel);
        }
        listMultimap.get(cls).remove(str);
        return channel;
    }

    private void loadSources(FlumeConfiguration.AgentConfiguration agentConfiguration, Map<String, ChannelComponent> map, Map<String, SourceRunner> map2) throws InstantiationException {
        Set<String> sourceSet = agentConfiguration.getSourceSet();
        Map sourceConfigMap = agentConfiguration.getSourceConfigMap();
        for (String str : sourceSet) {
            SourceConfiguration sourceConfiguration = (ComponentConfiguration) sourceConfigMap.get(str);
            if (sourceConfiguration != null) {
                SourceConfiguration sourceConfiguration2 = sourceConfiguration;
                Source create = this.sourceFactory.create(sourceConfiguration.getComponentName(), sourceConfiguration.getType());
                try {
                    Configurables.configure(create, sourceConfiguration2);
                    List<Channel> sourceChannels = getSourceChannels(map, create, sourceConfiguration2.getChannels());
                    if (sourceChannels.isEmpty()) {
                        throw new IllegalStateException(String.format("Source %s is not connected to a channel", str));
                        break;
                    }
                    ChannelProcessor channelProcessor = new ChannelProcessor(ChannelSelectorFactory.create(sourceChannels, sourceConfiguration2.getSelectorConfiguration()));
                    Configurables.configure(channelProcessor, sourceConfiguration2);
                    create.setChannelProcessor(channelProcessor);
                    map2.put(sourceConfiguration.getComponentName(), SourceRunner.forSource(create));
                    for (Channel channel : sourceChannels) {
                        ((ChannelComponent) Preconditions.checkNotNull(map.get(channel.getName()), String.format("Channel %s", channel.getName()))).components.add(str);
                    }
                } catch (Exception e) {
                    LOGGER.error(String.format("Source %s has been removed due to an error during configuration", str), e);
                }
            }
        }
        Map sourceContext = agentConfiguration.getSourceContext();
        for (String str2 : sourceSet) {
            Context context = (Context) sourceContext.get(str2);
            if (context != null) {
                Source create2 = this.sourceFactory.create(str2, context.getString("type"));
                try {
                    Configurables.configure(create2, context);
                    List<Channel> sourceChannels2 = getSourceChannels(map, create2, Arrays.asList(context.getString("channels").split("\\s+")));
                    if (sourceChannels2.isEmpty()) {
                        throw new IllegalStateException(String.format("Source %s is not connected to a channel", str2));
                        break;
                    }
                    ChannelProcessor channelProcessor2 = new ChannelProcessor(ChannelSelectorFactory.create(sourceChannels2, context.getSubProperties("selector.")));
                    Configurables.configure(channelProcessor2, context);
                    create2.setChannelProcessor(channelProcessor2);
                    map2.put(str2, SourceRunner.forSource(create2));
                    for (Channel channel2 : sourceChannels2) {
                        ((ChannelComponent) Preconditions.checkNotNull(map.get(channel2.getName()), String.format("Channel %s", channel2.getName()))).components.add(str2);
                    }
                } catch (Exception e2) {
                    LOGGER.error(String.format("Source %s has been removed due to an error during configuration", str2), e2);
                }
            }
        }
    }

    private List<Channel> getSourceChannels(Map<String, ChannelComponent> map, Source source, Collection<String> collection) throws InstantiationException {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            ChannelComponent channelComponent = map.get(it.next());
            if (channelComponent != null) {
                checkSourceChannelCompatibility(source, channelComponent.channel);
                arrayList.add(channelComponent.channel);
            }
        }
        return arrayList;
    }

    private void checkSourceChannelCompatibility(Source source, Channel channel) throws InstantiationException {
        if ((source instanceof BatchSizeSupported) && (channel instanceof TransactionCapacitySupported)) {
            long transactionCapacity = ((TransactionCapacitySupported) channel).getTransactionCapacity();
            long batchSize = ((BatchSizeSupported) source).getBatchSize();
            if (transactionCapacity < batchSize) {
                throw new InstantiationException(String.format("Incompatible source and channel settings defined. source's batch size is greater than the channels transaction capacity. Source: %s, batch size = %d, channel %s, transaction capacity = %d", source.getName(), Long.valueOf(batchSize), channel.getName(), Long.valueOf(transactionCapacity)));
            }
        }
    }

    private void checkSinkChannelCompatibility(Sink sink, Channel channel) throws InstantiationException {
        if ((sink instanceof BatchSizeSupported) && (channel instanceof TransactionCapacitySupported)) {
            long transactionCapacity = ((TransactionCapacitySupported) channel).getTransactionCapacity();
            long batchSize = ((BatchSizeSupported) sink).getBatchSize();
            if (transactionCapacity < batchSize) {
                throw new InstantiationException(String.format("Incompatible sink and channel settings defined. sink's batch size is greater than the channels transaction capacity. Sink: %s, batch size = %d, channel %s, transaction capacity = %d", sink.getName(), Long.valueOf(batchSize), channel.getName(), Long.valueOf(transactionCapacity)));
            }
        }
    }

    private void loadSinks(FlumeConfiguration.AgentConfiguration agentConfiguration, Map<String, ChannelComponent> map, Map<String, SinkRunner> map2) throws InstantiationException {
        Set<String> sinkSet = agentConfiguration.getSinkSet();
        Map sinkConfigMap = agentConfiguration.getSinkConfigMap();
        HashMap hashMap = new HashMap();
        for (String str : sinkSet) {
            SinkConfiguration sinkConfiguration = (ComponentConfiguration) sinkConfigMap.get(str);
            if (sinkConfiguration != null) {
                SinkConfiguration sinkConfiguration2 = sinkConfiguration;
                Sink create = this.sinkFactory.create(sinkConfiguration.getComponentName(), sinkConfiguration.getType());
                try {
                    Configurables.configure(create, sinkConfiguration2);
                    ChannelComponent channelComponent = map.get(sinkConfiguration2.getChannel());
                    if (channelComponent == null) {
                        throw new IllegalStateException(String.format("Sink %s is not connected to a channel", str));
                        break;
                    }
                    checkSinkChannelCompatibility(create, channelComponent.channel);
                    create.setChannel(channelComponent.channel);
                    hashMap.put(sinkConfiguration.getComponentName(), create);
                    channelComponent.components.add(str);
                } catch (Exception e) {
                    LOGGER.error(String.format("Sink %s has been removed due to an error during configuration", str), e);
                }
            }
        }
        Map sinkContext = agentConfiguration.getSinkContext();
        for (String str2 : sinkSet) {
            Context context = (Context) sinkContext.get(str2);
            if (context != null) {
                Sink create2 = this.sinkFactory.create(str2, context.getString("type"));
                try {
                    Configurables.configure(create2, context);
                    ChannelComponent channelComponent2 = map.get(context.getString("channel"));
                    if (channelComponent2 == null) {
                        throw new IllegalStateException(String.format("Sink %s is not connected to a channel", str2));
                        break;
                    }
                    checkSinkChannelCompatibility(create2, channelComponent2.channel);
                    create2.setChannel(channelComponent2.channel);
                    hashMap.put(str2, create2);
                    channelComponent2.components.add(str2);
                } catch (Exception e2) {
                    LOGGER.error(String.format("Sink %s has been removed due to an error during configuration", str2), e2);
                }
            }
        }
        loadSinkGroups(agentConfiguration, hashMap, map2);
    }

    private void loadSinkGroups(FlumeConfiguration.AgentConfiguration agentConfiguration, Map<String, Sink> map, Map<String, SinkRunner> map2) throws InstantiationException {
        Set<String> sinkgroupSet = agentConfiguration.getSinkgroupSet();
        Map sinkGroupConfigMap = agentConfiguration.getSinkGroupConfigMap();
        HashMap hashMap = new HashMap();
        for (String str : sinkgroupSet) {
            SinkGroupConfiguration sinkGroupConfiguration = (ComponentConfiguration) sinkGroupConfigMap.get(str);
            if (sinkGroupConfiguration != null) {
                SinkGroupConfiguration sinkGroupConfiguration2 = sinkGroupConfiguration;
                ArrayList arrayList = new ArrayList();
                for (String str2 : sinkGroupConfiguration2.getSinks()) {
                    Sink remove = map.remove(str2);
                    if (remove == null) {
                        String str3 = (String) hashMap.get(str2);
                        if (str3 == null) {
                            throw new InstantiationException(String.format("Sink %s of group %s does not exist or is not properly configured", str2, str));
                        }
                        throw new InstantiationException(String.format("Sink %s of group %s already in use by group %s", str2, str, str3));
                    }
                    arrayList.add(remove);
                    hashMap.put(str2, str);
                }
                try {
                    SinkGroup sinkGroup = new SinkGroup(arrayList);
                    Configurables.configure(sinkGroup, sinkGroupConfiguration2);
                    map2.put(sinkGroupConfiguration.getComponentName(), new SinkRunner(sinkGroup.getProcessor()));
                } catch (Exception e) {
                    LOGGER.error(String.format("SinkGroup %s has been removed due to an error during configuration", str), e);
                }
            }
        }
        for (Map.Entry<String, Sink> entry : map.entrySet()) {
            if (!hashMap.containsValue(entry.getKey())) {
                try {
                    DefaultSinkProcessor defaultSinkProcessor = new DefaultSinkProcessor();
                    ArrayList arrayList2 = new ArrayList();
                    arrayList2.add(entry.getValue());
                    defaultSinkProcessor.setSinks(arrayList2);
                    Configurables.configure(defaultSinkProcessor, new Context());
                    map2.put(entry.getKey(), new SinkRunner(defaultSinkProcessor));
                } catch (Exception e2) {
                    LOGGER.error(String.format("SinkGroup %s has been removed due to an error during configuration", entry.getKey()), e2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> toMap(Properties properties) {
        HashMap newHashMap = Maps.newHashMap();
        Enumeration<?> propertyNames = properties.propertyNames();
        while (propertyNames.hasMoreElements()) {
            String str = (String) propertyNames.nextElement();
            newHashMap.put(str, properties.getProperty(str));
        }
        return newHashMap;
    }
}
