package org.apache.nifi.atlas.reporting;

import com.sun.jersey.api.client.ClientResponse;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasServiceException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowAnalyzer;
import org.apache.nifi.atlas.hook.NiFiAtlasHook;
import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
import org.apache.nifi.atlas.resolver.ClusterResolver;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.RegexClusterResolver;
import org.apache.nifi.atlas.security.AtlasAuthN;
import org.apache.nifi.atlas.security.Basic;
import org.apache.nifi.atlas.security.Kerberos;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
import org.apache.nifi.ssl.SSLContextService;

@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas. End-to-end lineages across NiFi environments and other systems can be reported if those are connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc. Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets, in addition to NiFi provenance events providing detailed event level lineage. See 'Additional Details' for further description and limitations.")
@Stateful(scopes = {Scope.LOCAL}, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.<ClusterName>", value = "hostname Regex patterns", description = RegexClusterResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@RequiresInstanceClassLoading
@Tags({"atlas", "lineage"})
/* loaded from: input_file:org/apache/nifi/atlas/reporting/ReportLineageToAtlas.class */
public class ReportLineageToAtlas extends AbstractReportingTask {
    static final PropertyDescriptor ATLAS_URLS = new PropertyDescriptor.Builder().name("atlas-urls").displayName("Atlas URLs").description("Comma separated URL of Atlas Servers (e.g. http://atlas-server-hostname:21000 or https://atlas-server-hostname:21443). For accessing Atlas behind Knox gateway, specify Knox gateway URL (e.g. https://knox-hostname:8443/gateway/{topology-name}/atlas).").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    static final AllowableValue ATLAS_AUTHN_BASIC = new AllowableValue("basic", "Basic", "Use username and password.");
    static final AllowableValue ATLAS_AUTHN_KERBEROS = new AllowableValue("kerberos", "Kerberos", "Use Kerberos keytab file.");
    static final PropertyDescriptor ATLAS_AUTHN_METHOD = new PropertyDescriptor.Builder().name("atlas-authentication-method").displayName("Atlas Authentication Method").description("Specify how to authenticate this reporting task to Atlas server.").required(true).allowableValues(new AllowableValue[]{ATLAS_AUTHN_BASIC, ATLAS_AUTHN_KERBEROS}).defaultValue(ATLAS_AUTHN_BASIC.getValue()).build();
    public static final PropertyDescriptor ATLAS_USER = new PropertyDescriptor.Builder().name("atlas-username").displayName("Atlas Username").description("User name to communicate with Atlas.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    public static final PropertyDescriptor ATLAS_PASSWORD = new PropertyDescriptor.Builder().name("atlas-password").displayName("Atlas Password").description("Password to communicate with Atlas.").required(false).sensitive(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    static final PropertyDescriptor ATLAS_CONF_DIR = new PropertyDescriptor.Builder().name("atlas-conf-dir").displayName("Atlas Configuration Directory").description("Directory path that contains 'atlas-application.properties' file. If not specified and 'Create Atlas Configuration File' is disabled, then, 'atlas-application.properties' file under root classpath is used.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    public static final PropertyDescriptor ATLAS_NIFI_URL = new PropertyDescriptor.Builder().name("atlas-nifi-url").displayName("NiFi URL for Atlas").description("NiFi URL is used in Atlas to represent this NiFi cluster (or standalone instance). It is recommended to use one that can be accessible remotely instead of using 'localhost'.").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.URL_VALIDATOR).build();
    public static final PropertyDescriptor ATLAS_DEFAULT_CLUSTER_NAME = new PropertyDescriptor.Builder().name("atlas-default-cluster-name").displayName("Atlas Default Cluster Name").description("Cluster name for Atlas entities reported by this ReportingTask. If not specified, 'atlas.cluster.name' in Atlas Configuration File is used. Cluster name mappings can be configured by user defined properties. See additional detail for detail.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    static final PropertyDescriptor ATLAS_CONF_CREATE = new PropertyDescriptor.Builder().name("atlas-conf-create").displayName("Create Atlas Configuration File").description("If enabled, 'atlas-application.properties' file will be created in 'Atlas Configuration Directory' automatically when this Reporting Task starts. Note that the existing configuration file will be overwritten.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("ssl-context-service").displayName("SSL Context Service").description("Specifies the SSL Context Service to use for communicating with Atlas and Kafka.").required(false).identifiesControllerService(SSLContextService.class).build();
    static final PropertyDescriptor KAFKA_BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder().name("kafka-bootstrap-servers").displayName("Kafka Bootstrap Servers").description("Kafka Bootstrap Servers to send Atlas hook notification messages based on NiFi provenance events. E.g. 'localhost:9092' NOTE: Once this reporting task has started, restarting NiFi is required to changed this property as Atlas library holds a unmodifiable static reference to Kafka client.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
    static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
    static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL");
    static final PropertyDescriptor KAFKA_SECURITY_PROTOCOL = new PropertyDescriptor.Builder().name("kafka-security-protocol").displayName("Kafka Security Protocol").description("Protocol used to communicate with Kafka brokers to send Atlas hook notification messages. Corresponds to Kafka's 'security.protocol' property.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new AllowableValue[]{SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL}).defaultValue(SEC_PLAINTEXT.getValue()).build();
    public static final PropertyDescriptor NIFI_KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("nifi-kerberos-principal").displayName("NiFi Kerberos Principal").description("The Kerberos principal for this NiFi instance to access Atlas API and Kafka brokers. If not set, it is expected to set a JAAS configuration file in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.").required(false).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor NIFI_KERBEROS_KEYTAB = new PropertyDescriptor.Builder().name("nifi-kerberos-keytab").displayName("NiFi Kerberos Keytab").description("The Kerberos keytab for this NiFi instance to access Atlas API and Kafka brokers. If not set, it is expected to set a JAAS configuration file in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.").required(false).addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder().name("kerberos-credentials-service").displayName("Kerberos Credentials Service").description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos").identifiesControllerService(KerberosCredentialsService.class).required(false).build();
    static final PropertyDescriptor KAFKA_KERBEROS_SERVICE_NAME = new PropertyDescriptor.Builder().name("kafka-kerberos-service-name").displayName("Kafka Kerberos Service Name").description("The Kerberos principal name that Kafka runs for Atlas notification. This can be defined either in Kafka's JAAS config or in Kafka's config. Corresponds to Kafka's 'security.protocol' property. It is ignored unless one of the SASL options of the <Security Protocol> are selected.").required(false).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("kafka").build();
    static final AllowableValue LINEAGE_STRATEGY_SIMPLE_PATH = new AllowableValue("SimplePath", "Simple Path", "Map NiFi provenance events and target Atlas DataSets to statically created 'nifi_flow_path' Atlas Processes. See also 'Additional Details'.");
    static final AllowableValue LINEAGE_STRATEGY_COMPLETE_PATH = new AllowableValue("CompletePath", "Complete Path", "Create separate 'nifi_flow_path' Atlas Processes for each distinct input and output DataSet combinations by looking at the complete route for a given FlowFile. See also 'Additional Details.");
    static final PropertyDescriptor NIFI_LINEAGE_STRATEGY = new PropertyDescriptor.Builder().name("nifi-lineage-strategy").displayName("NiFi Lineage Strategy").description("Specifies granularity on how NiFi data flow should be reported to Atlas. NOTE: It is strongly recommended to keep using the same strategy once this reporting task started to keep Atlas data clean. Switching strategies will not delete Atlas entities created by the old strategy. Having mixed entities created by different strategies makes Atlas lineage graph noisy. For more detailed description on each strategy and differences, refer 'NiFi Lineage Strategy' section in Additional Details.").required(true).allowableValues(new AllowableValue[]{LINEAGE_STRATEGY_SIMPLE_PATH, LINEAGE_STRATEGY_COMPLETE_PATH}).defaultValue(LINEAGE_STRATEGY_SIMPLE_PATH.getValue()).build();
    private static final String ATLAS_PROPERTIES_FILENAME = "atlas-application.properties";
    private static final String ATLAS_PROPERTY_CLUSTER_NAME = "atlas.cluster.name";
    private static final String ATLAS_PROPERTY_ENABLE_TLS = "atlas.enableTLS";
    private static final String ATLAS_KAFKA_PREFIX = "atlas.kafka.";
    private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = "atlas.kafka.bootstrap.servers";
    private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = "atlas.kafka.client.id";
    private volatile AtlasAuthN atlasAuthN;
    private volatile Properties atlasProperties;
    private volatile String defaultClusterName;
    private volatile ProvenanceEventConsumer consumer;
    private volatile ClusterResolvers clusterResolvers;
    private volatile NiFiAtlasHook nifiAtlasHook;
    private volatile LineageStrategy lineageStrategy;
    private final ServiceLoader<ClusterResolver> clusterResolverLoader = ServiceLoader.load(ClusterResolver.class);
    private volatile boolean isTypeDefCreated = false;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(ATLAS_URLS);
        arrayList.add(ATLAS_AUTHN_METHOD);
        arrayList.add(ATLAS_USER);
        arrayList.add(ATLAS_PASSWORD);
        arrayList.add(ATLAS_CONF_DIR);
        arrayList.add(ATLAS_NIFI_URL);
        arrayList.add(ATLAS_DEFAULT_CLUSTER_NAME);
        arrayList.add(NIFI_LINEAGE_STRATEGY);
        arrayList.add(ProvenanceEventConsumer.PROVENANCE_START_POSITION);
        arrayList.add(ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE);
        arrayList.add(SSL_CONTEXT_SERVICE);
        arrayList.add(ATLAS_CONF_CREATE);
        arrayList.add(KERBEROS_CREDENTIALS_SERVICE);
        arrayList.add(NIFI_KERBEROS_PRINCIPAL);
        arrayList.add(NIFI_KERBEROS_KEYTAB);
        arrayList.add(KAFKA_KERBEROS_SERVICE_NAME);
        arrayList.add(KAFKA_BOOTSTRAP_SERVERS);
        arrayList.add(KAFKA_SECURITY_PROTOCOL);
        return arrayList;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        Iterator<ClusterResolver> it = this.clusterResolverLoader.iterator();
        while (it.hasNext()) {
            PropertyDescriptor supportedDynamicPropertyDescriptor = it.next().getSupportedDynamicPropertyDescriptor(str);
            if (supportedDynamicPropertyDescriptor != null) {
                return supportedDynamicPropertyDescriptor;
            }
        }
        return null;
    }

    private void parseAtlasUrls(PropertyValue propertyValue, Consumer<String> consumer) {
        String value = propertyValue.evaluateAttributeExpressions().getValue();
        if (value == null || value.isEmpty()) {
            return;
        }
        Arrays.stream(value.split(",")).map((v0) -> {
            return v0.trim();
        }).forEach(consumer);
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        boolean isSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
        ValidationResult.Builder valid = new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false);
        parseAtlasUrls(validationContext.getProperty(ATLAS_URLS), str -> {
            ValidationResult.Builder input = new ValidationResult.Builder().subject(ATLAS_URLS.getDisplayName()).input(str);
            try {
                if (!"https".equalsIgnoreCase(new URL(str).getProtocol()) || isSet) {
                    arrayList.add(input.explanation("Valid URI").valid(true).build());
                } else {
                    arrayList.add(valid.explanation("required by HTTPS Atlas access").build());
                }
            } catch (Exception e) {
                arrayList.add(input.explanation("Contains invalid URI: " + e).valid(false).build());
            }
        });
        arrayList.addAll(getAtlasAuthN(validationContext.getProperty(ATLAS_AUTHN_METHOD).getValue()).validate(validationContext));
        this.clusterResolverLoader.forEach(clusterResolver -> {
            arrayList.addAll(clusterResolver.validate(validationContext));
        });
        if (validationContext.getProperty(ATLAS_CONF_CREATE).asBoolean().booleanValue()) {
            Stream.of((Object[]) new PropertyDescriptor[]{ATLAS_CONF_DIR, ATLAS_DEFAULT_CLUSTER_NAME, KAFKA_BOOTSTRAP_SERVERS}).filter(propertyDescriptor -> {
                return !validationContext.getProperty(propertyDescriptor).isSet();
            }).forEach(propertyDescriptor2 -> {
                arrayList.add(new ValidationResult.Builder().subject(propertyDescriptor2.getDisplayName()).explanation("required to create Atlas configuration file.").valid(false).build());
            });
            validateKafkaProperties(validationContext, arrayList, isSet, valid);
        }
        return arrayList;
    }

    private void validateKafkaProperties(ValidationContext validationContext, Collection<ValidationResult> collection, boolean z, ValidationResult.Builder builder) {
        String principal;
        String keytab;
        String value = validationContext.getProperty(KAFKA_SECURITY_PROTOCOL).getValue();
        if ((SEC_SSL.equals(value) || SEC_SASL_SSL.equals(value)) && !z) {
            collection.add(builder.explanation("required by SSL Kafka connection").build());
        }
        String value2 = validationContext.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
        String value3 = validationContext.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
        KerberosCredentialsService asControllerService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
        if (asControllerService == null) {
            principal = value2;
            keytab = value3;
        } else {
            principal = asControllerService.getPrincipal();
            keytab = asControllerService.getKeytab();
        }
        if (SEC_SASL_PLAINTEXT.equals(value) || SEC_SASL_SSL.equals(value)) {
            if (!validationContext.getProperty(KAFKA_KERBEROS_SERVICE_NAME).isSet()) {
                collection.add(new ValidationResult.Builder().subject(KAFKA_KERBEROS_SERVICE_NAME.getDisplayName()).explanation("Required by Kafka SASL authentication.").valid(false).build());
            }
            if (keytab == null || principal == null) {
                collection.add(new ValidationResult.Builder().subject("Kerberos Authentication").explanation("Keytab and Principal are required for Kerberos authentication with Apache Kafka.").valid(false).build());
            }
        }
    }

    @OnScheduled
    public void setup(ConfigurationContext configurationContext) throws IOException {
        initAtlasProperties(configurationContext);
        initLineageStrategy(configurationContext);
        initClusterResolvers(configurationContext);
    }

    private void initLineageStrategy(ConfigurationContext configurationContext) throws IOException {
        this.nifiAtlasHook = new NiFiAtlasHook();
        String value = configurationContext.getProperty(NIFI_LINEAGE_STRATEGY).getValue();
        if (LINEAGE_STRATEGY_SIMPLE_PATH.equals(value)) {
            this.lineageStrategy = new SimpleFlowPathLineage();
        } else if (LINEAGE_STRATEGY_COMPLETE_PATH.equals(value)) {
            this.lineageStrategy = new CompleteFlowPathLineage();
        }
        this.lineageStrategy.setLineageContext(this.nifiAtlasHook);
        initProvenanceConsumer(configurationContext);
    }

    private void initClusterResolvers(ConfigurationContext configurationContext) {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        this.clusterResolverLoader.forEach(clusterResolver -> {
            clusterResolver.configure(configurationContext);
            linkedHashSet.add(clusterResolver);
        });
        this.clusterResolvers = new ClusterResolvers(Collections.unmodifiableSet(linkedHashSet), this.defaultClusterName);
    }

    private void initAtlasProperties(ConfigurationContext configurationContext) throws IOException {
        ArrayList arrayList = new ArrayList();
        PropertyValue property = configurationContext.getProperty(ATLAS_URLS);
        arrayList.getClass();
        parseAtlasUrls(property, (v1) -> {
            r2.add(v1);
        });
        boolean anyMatch = arrayList.stream().anyMatch(str -> {
            return str.toLowerCase().startsWith("https");
        });
        String value = configurationContext.getProperty(ATLAS_AUTHN_METHOD).getValue();
        String value2 = configurationContext.getProperty(ATLAS_CONF_DIR).evaluateAttributeExpressions().getValue();
        File file = (value2 == null || value2.isEmpty()) ? null : new File(value2);
        this.atlasProperties = new Properties();
        File file2 = new File(file, ATLAS_PROPERTIES_FILENAME);
        Boolean asBoolean = configurationContext.getProperty(ATLAS_CONF_CREATE).asBoolean();
        if (!asBoolean.booleanValue()) {
            if (file2.isFile()) {
                getLogger().info("Loading {}", new Object[]{file2});
                FileInputStream fileInputStream = new FileInputStream(file2);
                Throwable th = null;
                try {
                    try {
                        this.atlasProperties.load(fileInputStream);
                        if (fileInputStream != null) {
                            if (0 != 0) {
                                try {
                                    fileInputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                fileInputStream.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (fileInputStream != null) {
                        if (th != null) {
                            try {
                                fileInputStream.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            fileInputStream.close();
                        }
                    }
                    throw th4;
                }
            } else {
                InputStream resourceAsStream = ReportLineageToAtlas.class.getResourceAsStream("/atlas-application.properties");
                Throwable th6 = null;
                try {
                    getLogger().info("Loading {} from classpath", new Object[]{"/atlas-application.properties"});
                    if (resourceAsStream == null) {
                        throw new ProcessException(String.format("Could not find %s in classpath. Please add it to classpath, or specify %s a directory containing Atlas properties file, or enable %s to generate it.", "/atlas-application.properties", ATLAS_CONF_DIR.getDisplayName(), ATLAS_CONF_CREATE.getDisplayName()));
                    }
                    this.atlasProperties.load(resourceAsStream);
                    if (resourceAsStream != null) {
                        if (0 != 0) {
                            try {
                                resourceAsStream.close();
                            } catch (Throwable th7) {
                                th6.addSuppressed(th7);
                            }
                        } else {
                            resourceAsStream.close();
                        }
                    }
                } catch (Throwable th8) {
                    if (resourceAsStream != null) {
                        if (0 != 0) {
                            try {
                                resourceAsStream.close();
                            } catch (Throwable th9) {
                                th6.addSuppressed(th9);
                            }
                        } else {
                            resourceAsStream.close();
                        }
                    }
                    throw th8;
                }
            }
        }
        this.defaultClusterName = configurationContext.getProperty(ATLAS_DEFAULT_CLUSTER_NAME).evaluateAttributeExpressions().getValue();
        if (this.defaultClusterName == null || this.defaultClusterName.isEmpty()) {
            this.defaultClusterName = this.atlasProperties.getProperty(ATLAS_PROPERTY_CLUSTER_NAME);
        }
        if (this.defaultClusterName == null || this.defaultClusterName.isEmpty()) {
            throw new ProcessException("Default cluster name is not defined.");
        }
        this.atlasAuthN = getAtlasAuthN(value);
        this.atlasAuthN.configure(configurationContext);
        if (asBoolean.booleanValue()) {
            this.atlasProperties.put(ATLAS_PROPERTY_CLUSTER_NAME, this.defaultClusterName);
            this.atlasProperties.put(ATLAS_PROPERTY_ENABLE_TLS, String.valueOf(anyMatch));
            setKafkaConfig(this.atlasProperties, configurationContext);
            this.atlasAuthN.populateProperties(this.atlasProperties);
            FileOutputStream fileOutputStream = new FileOutputStream(file2);
            Throwable th10 = null;
            try {
                try {
                    this.atlasProperties.store(fileOutputStream, "Generated by Apache NiFi ReportLineageToAtlas ReportingTask at " + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX").withZone(ZoneOffset.UTC).format(Instant.now()));
                    if (fileOutputStream != null) {
                        if (0 != 0) {
                            try {
                                fileOutputStream.close();
                            } catch (Throwable th11) {
                                th10.addSuppressed(th11);
                            }
                        } else {
                            fileOutputStream.close();
                        }
                    }
                } catch (Throwable th12) {
                    th10 = th12;
                    throw th12;
                }
            } catch (Throwable th13) {
                if (fileOutputStream != null) {
                    if (th10 != null) {
                        try {
                            fileOutputStream.close();
                        } catch (Throwable th14) {
                            th10.addSuppressed(th14);
                        }
                    } else {
                        fileOutputStream.close();
                    }
                }
                throw th13;
            }
        }
        getLogger().debug("Force reloading Atlas application properties.");
        ApplicationProperties.forceReload();
        if (file != null) {
            Properties properties = System.getProperties();
            properties.setProperty("atlas.conf", file.getAbsolutePath());
            getLogger().debug("{} has been set to: {}", new Object[]{"atlas.conf", properties.getProperty("atlas.conf")});
        }
    }

    private NiFiAtlasClient createNiFiAtlasClient(ReportingContext reportingContext) {
        ArrayList arrayList = new ArrayList();
        PropertyValue property = reportingContext.getProperty(ATLAS_URLS);
        arrayList.getClass();
        parseAtlasUrls(property, (v1) -> {
            r2.add(v1);
        });
        try {
            return new NiFiAtlasClient(this.atlasAuthN.createClient((String[]) arrayList.toArray(new String[0])));
        } catch (NullPointerException e) {
            throw new ProcessException(String.format("Failed to initialize Atlas client due to %s. Make sure 'atlas-application.properties' is in the directory specified with %s or under root classpath if not specified.", e, ATLAS_CONF_DIR.getDisplayName()), e);
        }
    }

    private AtlasAuthN getAtlasAuthN(String str) {
        AtlasAuthN kerberos;
        boolean z = -1;
        switch (str.hashCode()) {
            case 93508654:
                if (str.equals("basic")) {
                    z = false;
                    break;
                }
                break;
            case 303053659:
                if (str.equals("kerberos")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                kerberos = new Basic();
                break;
            case true:
                kerberos = new Kerberos();
                break;
            default:
                throw new IllegalArgumentException(str + " is not supported as an Atlas authentication method.");
        }
        return kerberos;
    }

    private void initProvenanceConsumer(ConfigurationContext configurationContext) throws IOException {
        this.consumer = new ProvenanceEventConsumer();
        this.consumer.setStartPositionValue(configurationContext.getProperty(ProvenanceEventConsumer.PROVENANCE_START_POSITION).getValue());
        this.consumer.setBatchSize(configurationContext.getProperty(ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE).asInteger().intValue());
        this.consumer.addTargetEventType(this.lineageStrategy.getTargetEventTypes());
        this.consumer.setLogger(getLogger());
        this.consumer.setScheduled(true);
    }

    @OnUnscheduled
    public void onUnscheduled() {
        if (this.consumer != null) {
            this.consumer.setScheduled(false);
        }
    }

    @OnStopped
    public void onStopped() {
        if (this.nifiAtlasHook != null) {
            this.nifiAtlasHook.close();
            this.nifiAtlasHook = null;
        }
    }

    public void onTrigger(ReportingContext reportingContext) {
        String clusterNodeIdentifier = reportingContext.getClusterNodeIdentifier();
        boolean isClustered = reportingContext.isClustered();
        if (isClustered && StringUtils.isEmpty(clusterNodeIdentifier)) {
            return;
        }
        boolean z = !isClustered || getNodeTypeProvider().isPrimary();
        NiFiAtlasClient createNiFiAtlasClient = createNiFiAtlasClient(reportingContext);
        if (!this.isTypeDefCreated) {
            try {
                if (z) {
                    createNiFiAtlasClient.registerNiFiTypeDefs(false);
                } else if (!createNiFiAtlasClient.isNiFiTypeDefsRegistered()) {
                    getLogger().debug("NiFi type definitions are not ready in Atlas type system yet.");
                    return;
                }
                this.isTypeDefCreated = true;
            } catch (AtlasServiceException e) {
                throw new RuntimeException("Failed to check and create NiFi flow type definitions in Atlas due to " + e, e);
            }
        }
        NiFiFlow createNiFiFlow = createNiFiFlow(reportingContext, createNiFiAtlasClient);
        if (z) {
            try {
                createNiFiAtlasClient.registerNiFiFlow(createNiFiFlow);
            } catch (AtlasServiceException e2) {
                throw new RuntimeException("Failed to register NiFI flow. " + e2, e2);
            }
        }
        this.nifiAtlasHook.setAtlasClient(createNiFiAtlasClient);
        consumeNiFiProvenanceEvents(reportingContext, createNiFiFlow);
    }

    private NiFiFlow createNiFiFlow(ReportingContext reportingContext, NiFiAtlasClient niFiAtlasClient) {
        ProcessGroupStatus groupStatus = reportingContext.getEventAccess().getGroupStatus("root");
        String name = groupStatus.getName();
        String value = reportingContext.getProperty(ATLAS_NIFI_URL).evaluateAttributeExpressions().getValue();
        try {
            String fromHostNames = this.clusterResolvers.fromHostNames(new URL(value).getHost());
            NiFiFlow niFiFlow = null;
            try {
                niFiFlow = niFiAtlasClient.fetchNiFiFlow(groupStatus.getId(), fromHostNames);
            } catch (AtlasServiceException e) {
                if (!ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
                    throw new RuntimeException("Failed to fetch existing NiFI flow. " + e, e);
                }
                getLogger().debug("Existing flow was not found for {}@{}", new Object[]{groupStatus.getId(), fromHostNames});
            }
            NiFiFlow niFiFlow2 = niFiFlow != null ? niFiFlow : new NiFiFlow(groupStatus.getId());
            niFiFlow2.setFlowName(name);
            niFiFlow2.setUrl(value);
            niFiFlow2.setClusterName(fromHostNames);
            NiFiFlowAnalyzer niFiFlowAnalyzer = new NiFiFlowAnalyzer();
            niFiFlowAnalyzer.analyzeProcessGroup(niFiFlow2, groupStatus);
            niFiFlowAnalyzer.analyzePaths(niFiFlow2);
            return niFiFlow2;
        } catch (MalformedURLException e2) {
            throw new IllegalArgumentException("Failed to parse NiFi URL, " + e2.getMessage(), e2);
        }
    }

    private void consumeNiFiProvenanceEvents(ReportingContext reportingContext, NiFiFlow niFiFlow) {
        StandardAnalysisContext standardAnalysisContext = new StandardAnalysisContext(niFiFlow, this.clusterResolvers, reportingContext.getEventAccess().getProvenanceRepository());
        this.consumer.consumeEvents(reportingContext, (componentMapHolder, list) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) it.next();
                try {
                    this.lineageStrategy.processEvent(standardAnalysisContext, niFiFlow, provenanceEventRecord);
                } catch (Exception e) {
                    getLogger().error("Skipping failed analyzing event {} due to {}.", new Object[]{provenanceEventRecord, e, e});
                }
            }
            this.nifiAtlasHook.commitMessages();
        });
    }

    private void setKafkaConfig(Map<Object, Object> map, PropertyContext propertyContext) {
        map.put(ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS, propertyContext.getProperty(KAFKA_BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue());
        map.put(ATLAS_PROPERTY_KAFKA_CLIENT_ID, String.format("%s.%s", getName(), getIdentifier()));
        String value = propertyContext.getProperty(KAFKA_SECURITY_PROTOCOL).getValue();
        map.put("atlas.kafka.security.protocol", value);
        SSLContextService asControllerService = propertyContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (asControllerService != null && asControllerService.isKeyStoreConfigured()) {
            map.put("atlas.kafka.ssl.keystore.location", asControllerService.getKeyStoreFile());
            map.put("atlas.kafka.ssl.keystore.password", asControllerService.getKeyStorePassword());
            map.put("atlas.kafka.ssl.key.password", asControllerService.getKeyPassword() == null ? asControllerService.getKeyStorePassword() : asControllerService.getKeyPassword());
            map.put("atlas.kafka.ssl.keystore.type", asControllerService.getKeyStoreType());
        }
        if (asControllerService != null && asControllerService.isTrustStoreConfigured()) {
            map.put("atlas.kafka.ssl.truststore.location", asControllerService.getTrustStoreFile());
            map.put("atlas.kafka.ssl.truststore.password", asControllerService.getTrustStorePassword());
            map.put("atlas.kafka.ssl.truststore.type", asControllerService.getTrustStoreType());
        }
        if (SEC_SASL_PLAINTEXT.equals(value) || SEC_SASL_SSL.equals(value)) {
            setKafkaJaasConfig(map, propertyContext);
        }
    }

    private void setKafkaJaasConfig(Map<Object, Object> map, PropertyContext propertyContext) {
        String principal;
        String keytab;
        String value = propertyContext.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
        String value2 = propertyContext.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
        KerberosCredentialsService asControllerService = propertyContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
        if (asControllerService == null) {
            principal = value;
            keytab = value2;
        } else {
            principal = asControllerService.getPrincipal();
            keytab = asControllerService.getKeytab();
        }
        String value3 = propertyContext.getProperty(KAFKA_KERBEROS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
        if (StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(value3)) {
            map.put("atlas.jaas.KafkaClient.loginModuleControlFlag", "required");
            map.put("atlas.jaas.KafkaClient.loginModuleName", "com.sun.security.auth.module.Krb5LoginModule");
            map.put("atlas.jaas.KafkaClient.option.keyTab", keytab);
            map.put("atlas.jaas.KafkaClient.option.principal", principal);
            map.put("atlas.jaas.KafkaClient.option.serviceName", value3);
            map.put("atlas.jaas.KafkaClient.option.storeKey", "True");
            map.put("atlas.jaas.KafkaClient.option.useKeyTab", "True");
            map.put("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", "required");
            map.put("atlas.jaas.ticketBased-KafkaClient.loginModuleName", "com.sun.security.auth.module.Krb5LoginModule");
            map.put("atlas.jaas.ticketBased-KafkaClient.option.useTicketCache", "true");
            map.put("atlas.kafka.sasl.kerberos.service.name", value3);
        }
    }
}
