/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.atlas;

import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.nifi.atlas.AtlasUtils;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowPath;
import org.apache.nifi.atlas.NiFiTypes;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NiFiAtlasClient {
    private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class);
    private final AtlasClientV2 atlasClient;
    private Pattern FLOW_PATH_URL_PATTERN = Pattern.compile("^http.+processGroupId=([0-9a-z\\-]+).*$");

    public NiFiAtlasClient(AtlasClientV2 atlasClient) {
        this.atlasClient = atlasClient;
    }

    void deleteTypeDefs(String ... typeNames) throws AtlasServiceException {
        AtlasTypesDef existingTypeDef = this.getTypeDefs(typeNames);
        try {
            this.atlasClient.deleteAtlasTypeDefs(existingTypeDef);
        }
        catch (UniformInterfaceException e) {
            if (e.getResponse().getStatus() == 204) {
                logger.info("Deleted type defs: {}", (Object)existingTypeDef);
            }
            throw e;
        }
    }

    public boolean isNiFiTypeDefsRegistered() throws AtlasServiceException {
        Set<String> typeNames = NiFiTypes.ENTITIES.keySet();
        Map existingDefs = this.getTypeDefs(typeNames.toArray(new String[typeNames.size()])).getEntityDefs().stream().collect(Collectors.toMap(AtlasBaseTypeDef::getName, Function.identity()));
        return typeNames.stream().allMatch(existingDefs::containsKey);
    }

    public void registerNiFiTypeDefs(boolean update) throws AtlasServiceException {
        Set<String> typeNames = NiFiTypes.ENTITIES.keySet();
        Map existingDefs = this.getTypeDefs(typeNames.toArray(new String[typeNames.size()])).getEntityDefs().stream().collect(Collectors.toMap(AtlasBaseTypeDef::getName, Function.identity()));
        AtomicBoolean shouldUpdate = new AtomicBoolean(false);
        AtlasTypesDef type = new AtlasTypesDef();
        typeNames.stream().filter(typeName -> {
            AtlasEntityDef existingDef = (AtlasEntityDef)existingDefs.get(typeName);
            if (existingDef != null) {
                if (!update) {
                    return false;
                }
                shouldUpdate.set(true);
            }
            return true;
        }).forEach(typeName -> {
            NiFiTypes.EntityDefinition def = NiFiTypes.ENTITIES.get(typeName);
            AtlasEntityDef entity = new AtlasEntityDef();
            type.getEntityDefs().add(entity);
            entity.setName(typeName);
            HashSet<String> superTypes = new HashSet<String>();
            ArrayList<AtlasStructDef.AtlasAttributeDef> attributes = new ArrayList<AtlasStructDef.AtlasAttributeDef>();
            def.define(entity, superTypes, attributes);
            entity.setSuperTypes(superTypes);
            entity.setAttributeDefs(attributes);
        });
        AtlasTypesDef atlasTypeDefsResult = shouldUpdate.get() ? this.atlasClient.updateAtlasTypeDefs(type) : this.atlasClient.createAtlasTypeDefs(type);
        logger.debug("Result={}", (Object)atlasTypeDefsResult);
    }

    private AtlasTypesDef getTypeDefs(String ... typeNames) throws AtlasServiceException {
        AtlasTypesDef typeDefs = new AtlasTypesDef();
        for (int i = 0; i < typeNames.length; ++i) {
            MultivaluedMapImpl searchParams = new MultivaluedMapImpl();
            searchParams.add((Object)"name", (Object)typeNames[i]);
            AtlasTypesDef typeDef = this.atlasClient.getAllTypeDefs(new SearchFilter((MultivaluedMap)searchParams));
            typeDefs.getEntityDefs().addAll(typeDef.getEntityDefs());
        }
        logger.debug("typeDefs={}", (Object)typeDefs);
        return typeDefs;
    }

    public NiFiFlow fetchNiFiFlow(String rootProcessGroupId, String clusterName) throws AtlasServiceException {
        String qualifiedName = AtlasUtils.toQualifiedName(clusterName, rootProcessGroupId);
        AtlasObjectId flowId = new AtlasObjectId("nifi_flow", "qualifiedName", (Object)qualifiedName);
        AtlasEntity.AtlasEntityWithExtInfo nifiFlowExt = this.searchEntityDef(flowId);
        if (nifiFlowExt == null || nifiFlowExt.getEntity() == null) {
            return null;
        }
        AtlasEntity nifiFlowEntity = nifiFlowExt.getEntity();
        Map attributes = nifiFlowEntity.getAttributes();
        NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId);
        nifiFlow.setExEntity(nifiFlowEntity);
        nifiFlow.setFlowName(AtlasUtils.toStr(attributes.get("name")));
        nifiFlow.setClusterName(clusterName);
        nifiFlow.setUrl(AtlasUtils.toStr(attributes.get("url")));
        nifiFlow.setDescription(AtlasUtils.toStr(attributes.get("description")));
        nifiFlow.getQueues().putAll(this.toQualifiedNameIds(this.toAtlasObjectIds(nifiFlowEntity.getAttribute("queues"))));
        nifiFlow.getRootInputPortEntities().putAll(this.toQualifiedNameIds(this.toAtlasObjectIds(nifiFlowEntity.getAttribute("inputPorts"))));
        nifiFlow.getRootOutputPortEntities().putAll(this.toQualifiedNameIds(this.toAtlasObjectIds(nifiFlowEntity.getAttribute("outputPorts"))));
        Map<String, NiFiFlowPath> flowPaths = nifiFlow.getFlowPaths();
        Map<AtlasObjectId, AtlasEntity> flowPathEntities = this.toQualifiedNameIds(this.toAtlasObjectIds(attributes.get("flowPaths")));
        for (AtlasEntity flowPathEntity : flowPathEntities.values()) {
            Matcher urlMatcher;
            String pathQualifiedName = AtlasUtils.toStr(flowPathEntity.getAttribute("qualifiedName"));
            NiFiFlowPath flowPath = new NiFiFlowPath(AtlasUtils.getComponentIdFromQualifiedName(pathQualifiedName));
            if (flowPathEntity.hasAttribute("url") && (urlMatcher = this.FLOW_PATH_URL_PATTERN.matcher(AtlasUtils.toStr(flowPathEntity.getAttribute("url")))).matches()) {
                flowPath.setGroupId(urlMatcher.group(1));
            }
            flowPath.setExEntity(flowPathEntity);
            flowPath.setName(AtlasUtils.toStr(flowPathEntity.getAttribute("name")));
            flowPath.getInputs().addAll(this.toQualifiedNameIds(this.toAtlasObjectIds(flowPathEntity.getAttribute("inputs"))).keySet());
            flowPath.getOutputs().addAll(this.toQualifiedNameIds(this.toAtlasObjectIds(flowPathEntity.getAttribute("outputs"))).keySet());
            flowPath.startTrackingChanges(nifiFlow);
            flowPaths.put(flowPath.getId(), flowPath);
        }
        nifiFlow.startTrackingChanges();
        return nifiFlow;
    }

    private List<AtlasObjectId> toAtlasObjectIds(Object _references) {
        if (_references == null) {
            return Collections.emptyList();
        }
        List references = (List)_references;
        return references.stream().map(ref -> new AtlasObjectId(AtlasUtils.toStr(ref.get("guid")), AtlasUtils.toStr(ref.get("typeName")), ref)).collect(Collectors.toList());
    }

    private Map<AtlasObjectId, AtlasEntity> toQualifiedNameIds(List<AtlasObjectId> ids) {
        if (ids == null) {
            return Collections.emptyMap();
        }
        return ids.stream().distinct().map(id -> {
            try {
                AtlasEntity.AtlasEntityWithExtInfo entityExt = this.searchEntityDef((AtlasObjectId)id);
                AtlasEntity entity = entityExt.getEntity();
                if (AtlasEntity.Status.DELETED.equals((Object)entity.getStatus())) {
                    return null;
                }
                Map<String, Object> uniqueAttrs = Collections.singletonMap("qualifiedName", entity.getAttribute("qualifiedName"));
                return new Tuple((Object)new AtlasObjectId(id.getGuid(), id.getTypeName(), uniqueAttrs), (Object)entity);
            }
            catch (AtlasServiceException e) {
                logger.warn("Failed to search entity by id {}, due to {}", id, (Object)e);
                return null;
            }
        }).filter(Objects::nonNull).collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
    }

    public void registerNiFiFlow(NiFiFlow nifiFlow) throws AtlasServiceException {
        AtlasEntity flowEntity = this.registerNiFiFlowEntity(nifiFlow);
        Map<String, List<AtlasEntity>> updatedDataSetEntities = this.registerDataSetEntities(nifiFlow);
        Set<AtlasObjectId> remainingPathIds = this.registerFlowPathEntities(nifiFlow);
        boolean shouldUpdateNiFiFlow = nifiFlow.isMetadataUpdated();
        if (remainingPathIds != null) {
            flowEntity.setAttribute("flowPaths", remainingPathIds);
            shouldUpdateNiFiFlow = true;
        }
        if (updatedDataSetEntities.containsKey("nifi_queue")) {
            flowEntity.setAttribute("queues", updatedDataSetEntities.get("nifi_queue"));
            shouldUpdateNiFiFlow = true;
        }
        if (updatedDataSetEntities.containsKey("nifi_input_port")) {
            flowEntity.setAttribute("inputPorts", updatedDataSetEntities.get("nifi_input_port"));
            shouldUpdateNiFiFlow = true;
        }
        if (updatedDataSetEntities.containsKey("nifi_output_port")) {
            flowEntity.setAttribute("outputPorts", updatedDataSetEntities.get("nifi_output_port"));
            shouldUpdateNiFiFlow = true;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("### NiFi Flow Audit Logs START");
            nifiFlow.getUpdateAudit().forEach(arg_0 -> ((Logger)logger).debug(arg_0));
            nifiFlow.getFlowPaths().forEach((k, v) -> {
                logger.debug("--- NiFiFlowPath Audit Logs: {}", k);
                v.getUpdateAudit().forEach(arg_0 -> ((Logger)logger).debug(arg_0));
            });
            logger.debug("### NiFi Flow Audit Logs END");
        }
        if (shouldUpdateNiFiFlow) {
            ArrayList<AtlasEntity> entities = new ArrayList<AtlasEntity>();
            AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(entities);
            entities.add(flowEntity);
            try {
                EntityMutationResponse mutationResponse = this.atlasClient.createEntities(atlasEntities);
                logger.debug("mutation response={}", (Object)mutationResponse);
            }
            catch (AtlasServiceException e) {
                if (e.getStatus().getStatusCode() == AtlasErrorCode.INSTANCE_NOT_FOUND.getHttpCode().getStatusCode() && e.getMessage().contains(AtlasErrorCode.INSTANCE_NOT_FOUND.getErrorCode())) {
                    logger.debug("Received error response from Atlas but it should be stored." + (Object)((Object)e));
                }
                throw e;
            }
        }
    }

    private AtlasEntity registerNiFiFlowEntity(NiFiFlow nifiFlow) throws AtlasServiceException {
        ArrayList<AtlasEntity> entities = new ArrayList<AtlasEntity>();
        AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(entities);
        if (!nifiFlow.isMetadataUpdated()) {
            return nifiFlow.getExEntity();
        }
        AtlasEntity flowEntity = nifiFlow.getExEntity() != null ? new AtlasEntity(nifiFlow.getExEntity()) : new AtlasEntity();
        flowEntity.setTypeName("nifi_flow");
        flowEntity.setVersion(Long.valueOf(1L));
        flowEntity.setAttribute("name", (Object)nifiFlow.getFlowName());
        flowEntity.setAttribute("qualifiedName", (Object)nifiFlow.toQualifiedName(nifiFlow.getRootProcessGroupId()));
        flowEntity.setAttribute("url", (Object)nifiFlow.getUrl());
        flowEntity.setAttribute("description", (Object)nifiFlow.getDescription());
        if (flowEntity.getGuid().startsWith("-")) {
            entities.add(flowEntity);
            EntityMutationResponse mutationResponse = this.atlasClient.createEntities(atlasEntities);
            logger.debug("Registered a new nifi_flow entity, mutation response={}", (Object)mutationResponse);
            String assignedNiFiFlowGuid = (String)mutationResponse.getGuidAssignments().get(flowEntity.getGuid());
            flowEntity.setGuid(assignedNiFiFlowGuid);
            nifiFlow.setAtlasGuid(assignedNiFiFlowGuid);
        }
        return flowEntity;
    }

    private Map<String, List<AtlasEntity>> registerDataSetEntities(NiFiFlow nifiFlow) throws AtlasServiceException {
        EntityMutationResponse mutationResponse;
        AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities;
        Map<NiFiFlow.EntityChangeType, List<AtlasEntity>> changedEntities = nifiFlow.getChangedDataSetEntities();
        if (changedEntities.containsKey((Object)NiFiFlow.EntityChangeType.CREATED)) {
            List<AtlasEntity> createdEntities = changedEntities.get((Object)NiFiFlow.EntityChangeType.CREATED);
            atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(createdEntities);
            mutationResponse = this.atlasClient.createEntities(atlasEntities);
            logger.debug("Created DataSet entities mutation response={}", (Object)mutationResponse);
            Map guidAssignments = mutationResponse.getGuidAssignments();
            for (AtlasEntity entity2 : createdEntities) {
                Map<AtlasObjectId, AtlasEntity> entityMap;
                String guid = (String)guidAssignments.get(entity2.getGuid());
                String qualifiedName = AtlasUtils.toStr(entity2.getAttribute("qualifiedName"));
                if (StringUtils.isEmpty((String)guid)) {
                    logger.warn("GUID was not assigned for {}::{} for some reason.", (Object)entity2.getTypeName(), (Object)qualifiedName);
                    continue;
                }
                switch (entity2.getTypeName()) {
                    case "nifi_input_port": {
                        entityMap = nifiFlow.getRootInputPortEntities();
                        break;
                    }
                    case "nifi_output_port": {
                        entityMap = nifiFlow.getRootOutputPortEntities();
                        break;
                    }
                    case "nifi_queue": {
                        entityMap = nifiFlow.getQueues();
                        break;
                    }
                    default: {
                        throw new RuntimeException(entity2.getTypeName() + " is not expected.");
                    }
                }
                AtlasUtils.findIdByQualifiedName(entityMap.keySet(), qualifiedName).ifPresent(entityMap::remove);
                entity2.setGuid(guid);
                AtlasObjectId idWithGuid = new AtlasObjectId(guid, entity2.getTypeName(), Collections.singletonMap("qualifiedName", qualifiedName));
                entityMap.put(idWithGuid, entity2);
            }
        }
        if (changedEntities.containsKey((Object)NiFiFlow.EntityChangeType.UPDATED)) {
            List<AtlasEntity> updatedEntities = changedEntities.get((Object)NiFiFlow.EntityChangeType.UPDATED);
            atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(updatedEntities);
            mutationResponse = this.atlasClient.updateEntities(atlasEntities);
            logger.debug("Updated DataSet entities mutation response={}", (Object)mutationResponse);
        }
        Set<String> changedTypeNames = changedEntities.entrySet().stream().filter(entry -> !NiFiFlow.EntityChangeType.AS_IS.equals(entry.getKey())).flatMap(entry -> ((List)entry.getValue()).stream()).map(AtlasStruct::getTypeName).collect(Collectors.toSet());
        Map<String, List<AtlasEntity>> remainingEntitiesByType = changedEntities.entrySet().stream().filter(entry -> !NiFiFlow.EntityChangeType.DELETED.equals(entry.getKey())).flatMap(entry -> ((List)entry.getValue()).stream()).filter(entity -> changedTypeNames.contains(entity.getTypeName())).collect(Collectors.groupingBy(AtlasStruct::getTypeName));
        changedTypeNames.forEach(changedTypeName -> remainingEntitiesByType.computeIfAbsent((String)changedTypeName, k -> Collections.emptyList()));
        return remainingEntitiesByType;
    }

    private Set<AtlasObjectId> registerFlowPathEntities(NiFiFlow nifiFlow) throws AtlasServiceException {
        EntityMutationResponse mutationResponse;
        AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities;
        Map<NiFiFlow.EntityChangeType, List<AtlasEntity>> changedEntities = nifiFlow.getChangedFlowPathEntities();
        if (changedEntities.containsKey((Object)NiFiFlow.EntityChangeType.CREATED)) {
            List<AtlasEntity> createdEntities = changedEntities.get((Object)NiFiFlow.EntityChangeType.CREATED);
            atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(createdEntities);
            mutationResponse = this.atlasClient.createEntities(atlasEntities);
            logger.debug("Created FlowPath entities mutation response={}", (Object)mutationResponse);
            Map guidAssignments = mutationResponse.getGuidAssignments();
            createdEntities.forEach(entity -> {
                String guid = entity.getGuid();
                entity.setGuid((String)guidAssignments.get(guid));
                String pathId = AtlasUtils.getComponentIdFromQualifiedName(AtlasUtils.toStr(entity.getAttribute("qualifiedName")));
                NiFiFlowPath path = nifiFlow.getFlowPaths().get(pathId);
                path.setExEntity((AtlasEntity)entity);
            });
        }
        if (changedEntities.containsKey((Object)NiFiFlow.EntityChangeType.UPDATED)) {
            List<AtlasEntity> updatedEntities = changedEntities.get((Object)NiFiFlow.EntityChangeType.UPDATED);
            atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(updatedEntities);
            mutationResponse = this.atlasClient.updateEntities(atlasEntities);
            logger.debug("Updated FlowPath entities mutation response={}", (Object)mutationResponse);
            updatedEntities.forEach(entity -> {
                String pathId = AtlasUtils.getComponentIdFromQualifiedName(AtlasUtils.toStr(entity.getAttribute("qualifiedName")));
                NiFiFlowPath path = nifiFlow.getFlowPaths().get(pathId);
                path.setExEntity((AtlasEntity)entity);
            });
        }
        if (NiFiFlow.EntityChangeType.containsChange(changedEntities.keySet())) {
            return changedEntities.entrySet().stream().filter(entry -> !NiFiFlow.EntityChangeType.DELETED.equals(entry.getKey())).flatMap(entry -> ((List)entry.getValue()).stream()).map(path -> new AtlasObjectId(path.getGuid(), "nifi_flow_path", Collections.singletonMap("qualifiedName", path.getAttribute("qualifiedName")))).collect(Collectors.toSet());
        }
        return null;
    }

    public AtlasEntity.AtlasEntityWithExtInfo searchEntityDef(AtlasObjectId id) throws AtlasServiceException {
        String guid = id.getGuid();
        if (!StringUtils.isEmpty((String)guid)) {
            return this.atlasClient.getEntityByGuid(guid);
        }
        HashMap attributes = new HashMap();
        id.getUniqueAttributes().entrySet().stream().filter(entry -> entry.getValue() != null).forEach(entry -> attributes.put(entry.getKey(), entry.getValue().toString()));
        return this.atlasClient.getEntityByAttribute(id.getTypeName(), attributes);
    }
}

