ConnectorDeployer.java

/*
 * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.

 * WSO2 Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at

 * http://www.apache.org/licenses/LICENSE-2.0

 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.synapse.unittest;

import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMException;
import org.apache.axiom.om.impl.builder.StAXOMBuilder;
import org.apache.axiom.om.util.Base64;
import org.apache.axis2.AxisFault;
import org.apache.axis2.deployment.Deployer;
import org.apache.axis2.deployment.DeploymentEngine;
import org.apache.axis2.deployment.DeploymentException;
import org.apache.axis2.deployment.repository.util.DeploymentFileData;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.config.Entry;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.config.xml.EntryFactory;
import org.apache.synapse.config.xml.SynapseImportFactory;
import org.apache.synapse.config.xml.SynapseImportSerializer;
import org.apache.synapse.config.xml.XMLConfigConstants;
import org.apache.synapse.deployers.LibraryArtifactDeployer;
import org.apache.synapse.deployers.SynapseArtifactDeploymentStore;
import org.apache.synapse.libraries.imports.SynapseImport;
import org.apache.synapse.libraries.model.Library;
import org.apache.synapse.libraries.util.LibDeployerUtils;

import javax.xml.namespace.QName;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Class responsible for the deploy the connector resources uses for Unit test flow.
 * Mock the Connector Deployment methods from mediation only usage for the synapse unit testing framework
 */
class ConnectorDeployer {

    private static Logger log = Logger.getLogger(ConnectorDeployer.class.getName());

    private static final String SYNAPSE_CONFIG_LOCK = "synapse.config.lock";
    private static final String SYNAPSE_LIBS = "synapse-libs";
    private static final String SYNAPSE_LIBRARY_EXTENSION = "zip";

    private ConnectorDeployer(){}

    /**
     * Extract and load connector resources.
     *
     * @param connectorList list which has connector resources
     */
    static void deployConnectorResources(List<String> connectorList) {
        for (int x = 0; x < connectorList.size(); x++) {
            String javaTempTestDir = System.getProperty(Constants.PRAM_TEMP_DIR) + File.separator
                    + Constants.CONNECTOR_TEST_FOLDER;
            createDir(javaTempTestDir);

            BufferedOutputStream out = null;
            try {
                byte[] decodedConnector = Base64.decode(connectorList.get(x));
                String connectorZip = javaTempTestDir + File.separator + "connector-" + x + ".zip";
                out = new BufferedOutputStream(new FileOutputStream(connectorZip), 4096);
                out.write(decodedConnector);

                //deploy connector zip
                LibDeployerUtils.createSynapseLibrary(connectorZip);
                SynapseConfiguration configuration = UnitTestingExecutor.getExecuteInstance().getSynapseConfiguration();
                AxisConfiguration axisConfiguration = configuration.getAxisConfiguration();

                Deployer deployer = getSynapseLibraryDeployer(axisConfiguration);
                deployer.deploy(new DeploymentFileData(new File(connectorZip), deployer));
                try {
                    String artifactName = getArtifactName(connectorZip, axisConfiguration);
                    if (artifactName != null) {
                        if (configuration.getSynapseImports().get(artifactName) == null) {
                            String libName = artifactName.substring(artifactName.lastIndexOf("}") + 1);
                            String libraryPackage = artifactName.substring(1, artifactName.lastIndexOf("}"));
                            updateStatus(artifactName, libName, libraryPackage, "enabled", axisConfiguration);
                        }
                    }
                } catch (AxisFault axisFault) {
                    log.error("Unable to update status for the synapse library : ", axisFault);
                }

            } catch (DeploymentException e) {
                log.error("Error while deploying the synapse library : ", e);
            } catch (IOException e) {
                log.error("Error while writing the output stream of connector zip file", e);
            } finally {
                if (out != null) {
                    try {
                        out.close();
                    } catch (IOException e) {
                        log.error("Error while closing the output stream of connector zip file", e);
                    }
                }
            }
        }
    }

    /**
     * Get the deployer for the Synapse Library
     *
     * @param axisConfig AxisConfiguration instance
     * @return Deployer instance
     */
    private static Deployer getSynapseLibraryDeployer(AxisConfiguration axisConfig) {
        try {
            String synapseLibraryPath = axisConfig.getRepository().getPath() + SYNAPSE_LIBS;
            DeploymentEngine deploymentEngine = (DeploymentEngine) axisConfig.getConfigurator();
            deploymentEngine.addDeployer(new LibraryArtifactDeployer(), synapseLibraryPath, SYNAPSE_LIBRARY_EXTENSION);

            return deploymentEngine.getDeployer(synapseLibraryPath, SYNAPSE_LIBRARY_EXTENSION);
        } catch (Exception e) {
            log.error("Error occured while getting the deployer");
            return null;
        }
    }

    /**
     * Get the library artifact name.
     *
     * @param axisConfig AxisConfiguration of the current tenant
     */
    private static String getArtifactName(String filePath, AxisConfiguration axisConfig) {
        SynapseArtifactDeploymentStore deploymentStore;
        deploymentStore = getSynapseConfiguration(axisConfig).getArtifactDeploymentStore();
        return deploymentStore.getArtifactNameForFile(filePath);
    }

    /**
     * Performing the action of enabling/disabling the given meidation library
     *
     * @param libName library name
     * @param packageName package name
     * @param status current status
     * @param axisConfig AxisConfiguration of the current tenant
     * @throws AxisFault asxis fault
     */
    private static boolean updateStatus(String libQName, String libName, String packageName, String status,
                                        AxisConfiguration axisConfig) throws AxisFault {
        try {
            SynapseConfiguration synapseConfiguration = getSynapseConfiguration(axisConfig);
            SynapseImport synapseImport = synapseConfiguration.getSynapseImports().get(libQName);
            if (synapseImport == null && libName != null && packageName != null) {
                addImport(libName, packageName, axisConfig);
                synapseImport = synapseConfiguration.getSynapseImports().get(libQName);
            }
            Library synLib = synapseConfiguration.getSynapseLibraries().get(libQName);
            if (libQName != null && synLib != null) {
                if ("enabled".equals(status)) {
                    synapseImport.setStatus(true);
                    synLib.setLibStatus(true);
                    synLib.loadLibrary();
                    deployingLocalEntries(synLib, synapseConfiguration, axisConfig);
                } else {
                    synapseImport.setStatus(false);
                    synLib.setLibStatus(false);
                    synLib.unLoadLibrary();
                    undeployingLocalEntries(synLib, synapseConfiguration, axisConfig);
                }
            }

        } catch (Exception e) {
            log.error("Unable to update status for :  " + libQName, e);
        }
        return true;
    }

    /**
     * Undeploy the local entries deployed from the lib.
     *
     * @param axisConfig AxisConfiguration of the current tenant
     */
    private static void undeployingLocalEntries(Library library, SynapseConfiguration config, AxisConfiguration axisConfig) {
        if (log.isDebugEnabled()) {
            log.debug("Start : Removing Local registry entries from the configuration");
        }
        for (Map.Entry<String, Object> libararyEntryMap : library.getLocalEntryArtifacts()
                .entrySet()) {
            File localEntryFileObj = (File) libararyEntryMap.getValue();
            OMElement document = getOMElement(localEntryFileObj);
            deleteEntry(document.toString(), axisConfig);
        }
        if (log.isDebugEnabled()) {
            log.debug("End : Removing Local registry entries from the configuration");
        }
    }

    /**
     * Remove a local entry.
     *
     * @param element element string
     * @param axisConfig AxisConfiguration of the current tenant
     */
    private static boolean deleteEntry(String element, AxisConfiguration axisConfig) {

        final Lock lock = getLock(axisConfig);
        String entryKey = null;
        try {
            lock.lock();
            OMElement elem;
            try {
                elem = nonCoalescingStringToOm(element);
            } catch (XMLStreamException e) {
                log.error("Error while converting the file content : ",  e);
                return false;
            }

            if (elem.getQName().getLocalPart().equals(XMLConfigConstants.ENTRY_ELT.getLocalPart())) {

                entryKey = elem.getAttributeValue(new QName("key"));
                entryKey = entryKey.trim();
                log.debug("Adding local entry with key : " + entryKey);

                SynapseConfiguration synapseConfiguration = getSynapseConfiguration(axisConfig);
                Entry entry = synapseConfiguration.getDefinedEntries().get(entryKey);
                if (entry != null) {
                    synapseConfiguration.removeEntry(entryKey);
                    if (log.isDebugEnabled()) {
                        log.debug("Deleted local entry with key : " + entryKey);
                    }
                    return true;
                } else {
                    log.warn("No entry exists by the key : " + entryKey);
                    return false;
                }
            }
        } catch (SynapseException syne) {
            log.error("Unable to delete the local entry : " + entryKey, syne);
        } catch (Exception e) {
            log.error("Unable to delete the local entry : " + entryKey, e);
        } finally {
            lock.unlock();
        }
        return false;
    }

    /**
     * Deploy the local entries from lib
     *
     * @param axisConfig AxisConfiguration of the current tenant
     */
    private static void deployingLocalEntries(Library library, SynapseConfiguration config, AxisConfiguration axisConfig) {
        if (log.isDebugEnabled()) {
            log.debug("Start : Adding Local registry entries to the configuration");
        }
        for (Map.Entry<String, Object> libararyEntryMap : library.getLocalEntryArtifacts()
                .entrySet()) {
            File localEntryFileObj = (File) libararyEntryMap.getValue();
            OMElement document = getOMElement(localEntryFileObj);
            addEntry(document.toString(), axisConfig);
        }
        if (log.isDebugEnabled()) {
            log.debug("End : Adding Local registry entries to the configuration");
        }
    }

    /**
     * Add a local entry.
     *
     * @param element ekement string
     * @param axisConfig AxisConfiguration of the current tenant
     */
    private static boolean addEntry(String element, AxisConfiguration axisConfig) {
        final Lock lock = getLock(axisConfig);
        try {
            lock.lock();
            OMElement elem;
            try {
                elem = nonCoalescingStringToOm(element);
            } catch (XMLStreamException e) {
                log.error("Error while converting the file content : ",  e);
                return false;
            }

            if (elem.getQName().getLocalPart().equals(XMLConfigConstants.ENTRY_ELT.getLocalPart())) {

                String entryKey = elem.getAttributeValue(new QName("key"));
                entryKey = entryKey.trim();
                SynapseConfiguration synapseConfiguration = getSynapseConfiguration(axisConfig);
                if (log.isDebugEnabled()) {
                    log.debug("Adding local entry with key : " + entryKey);
                }
                if (synapseConfiguration.getLocalRegistry().containsKey(entryKey)) {
                    log.error("An Entry with key " + entryKey +
                            " is already used within the configuration");
                } else {
                    Entry entry =
                            EntryFactory.createEntry(elem,
                                    synapseConfiguration.getProperties());
                    entry.setFileName(generateFileName(entry.getKey()));
                    synapseConfiguration.addEntry(entryKey, entry);
                }
                if (log.isDebugEnabled()) {
                    log.debug("Local registry entry : " + entryKey + " added to the configuration");
                }
                return true;
            } else {
                log.warn("Error adding local entry. Invalid definition");
            }
        } catch (SynapseException syne) {
            log.error("Unable to add local entry ", syne);
        } catch (OMException e) {
            log.error("Unable to add local entry. Invalid XML ", e);
        } catch (Exception e) {
            log.error("Unable to add local entry. Invalid XML ", e);
        } finally {
            lock.unlock();
        }
        return false;
    }

    /**
     * Convert string as OMElement.
     *
     * @param xmlStr string  which want to convert as OMElement
     * @return coverted OMElement
     */
    private static OMElement nonCoalescingStringToOm(String xmlStr) throws XMLStreamException {
        StringReader strReader = new StringReader(xmlStr);
        XMLInputFactory xmlInFac = XMLInputFactory.newInstance();
        xmlInFac.setProperty("javax.xml.stream.isCoalescing", false);
        XMLStreamReader parser = xmlInFac.createXMLStreamReader(strReader);
        StAXOMBuilder builder = new StAXOMBuilder(parser);
        return builder.getDocumentElement();
    }

    /**
     * Acquires the lock for parameters.
     *
     * @param axisConfig AxisConfiguration instance
     * @return Lock instance
     */
    private static Lock getLock(AxisConfiguration axisConfig) {
        Parameter parameter = axisConfig.getParameter(SYNAPSE_CONFIG_LOCK);
        if (parameter != null) {
            return (Lock) parameter.getValue();
        } else {
            log.warn(SYNAPSE_CONFIG_LOCK + " is null, Recreating a new lock");
            Lock lock = new ReentrantLock();
            try {
                axisConfig.addParameter(SYNAPSE_CONFIG_LOCK, lock);
                return lock;
            } catch (AxisFault axisFault) {
                log.error("Error while setting " + SYNAPSE_CONFIG_LOCK, axisFault);
            }
        }

        return null;
    }

    /**
     * Get OMElement from the file.
     *
     * @param file file which want to convert as OMElement
     * @return file data as OMElement
     */
    private static OMElement getOMElement(File file) {
        OMElement document = null;

        FileInputStream is;
        try {
            is = FileUtils.openInputStream(file);
        } catch (IOException e) {
            log.error("Error while opening the file: " + file.getName() + " for reading", e);
            return null;
        }

        try {
            document = (new StAXOMBuilder(is)).getDocumentElement();
            document.build();
            is.close();
        } catch (XMLStreamException e) {
            log.error("Error while parsing the content of the file: " + file.getName(), e);
        } catch (IOException e) {
            log.warn("Error while closing the input stream from the file: " + file.getName(), e);
        } catch (Exception e) {
            log.error("Error while building the content of the file: " + file.getName(), e);
        }

        return document;
    }

    /**
     * Helper method to retrieve the Synapse configuration from the relevant axis configuration
     *
     * @param axisConfig AxisConfiguration of the current tenant
     * @return extracted SynapseConfiguration from the relevant AxisConfiguration
     */
    private static SynapseConfiguration getSynapseConfiguration(AxisConfiguration axisConfig) {
        return (SynapseConfiguration) axisConfig.getParameter(
                SynapseConstants.SYNAPSE_CONFIG).getValue();
    }

    /**
     * Performing the action of importing the given meidation library
     *
     * @param libName library name
     * @param packageName package name
     * @param axisConfig AxisConfiguration of the current tenant
     * @throws AxisFault exception
     */
    private static void addImport(String libName, String packageName, AxisConfiguration axisConfig) throws AxisFault {
        SynapseImport synImport = new SynapseImport();
        synImport.setLibName(libName);
        synImport.setLibPackage(packageName);
        OMElement impEl = SynapseImportSerializer.serializeImport(synImport);
        if (impEl != null) {
            try {
                addImport(impEl.toString(), axisConfig);
            } catch (AxisFault axisFault) {
                log.error("Could not add Synapse Import", axisFault);
            }
        } else {
            log.error( "Could not add Synapse Import. Invalid import params for libName : " +
                    libName + " packageName : " + packageName);
        }
    }

    /**
     * Get an XML configuration element for a message processor from the FE and
     * creates and add the MessageStore to the synapse configuration.
     *
     * @param xml string that contain the message processor configuration.
     * @param axisConfig AxisConfiguration of the current tenant
     * @throws AxisFault if some thing goes wrong when creating a MessageProcessor with the given xml.
     */
    private static void addImport(String xml, AxisConfiguration axisConfig) throws AxisFault {
        try {
            OMElement imprtElem = createElement(xml);
            SynapseImport synapseImport = SynapseImportFactory.createImport(imprtElem, null);
            if (synapseImport != null && synapseImport.getName() != null) {
                SynapseConfiguration synapseConfiguration = getSynapseConfiguration(axisConfig);
                String fileName = generateFileName(synapseImport.getName());
                synapseImport.setFileName(fileName);
                synapseConfiguration.addSynapseImport(synapseImport.getName(), synapseImport);
                String synImportQualfiedName = LibDeployerUtils.getQualifiedName(synapseImport);
                Library synLib =
                        synapseConfiguration.getSynapseLibraries()
                                .get(synImportQualfiedName);
                if (synLib != null) {
                    LibDeployerUtils.loadLibArtifacts(synapseImport, synLib);
                }
            } else {
                log.error("Unable to create a Synapse Import for :  " + xml, null);
            }

        } catch (XMLStreamException e) {
            log.error("Unable to create a Synapse Import for :  " + xml, e);
        }

    }

    /**
     * Creates an <code>OMElement</code> from the given string
     *
     * @param str the XML string
     * @return the <code>OMElement</code> representation of the given string
     * @throws XMLStreamException if building the <code>OmElement</code> is unsuccessful
     */
    private static OMElement createElement(String str) throws XMLStreamException {
        InputStream in = new ByteArrayInputStream(str.getBytes());
        return new StAXOMBuilder(in).getDocumentElement();
    }

    /**
     * Generate a file name from input string.
     *
     * @param name name of the input
     * @return file name of the input string
     */
    private static String generateFileName(String name) {
        return name.replaceAll("[\\/?*|:<> ]", "_") + ".xml";
    }

    /**
     * Create a directory for given path.
     *
     * @param path path of the directory
     */
    private static void createDir(String path) {
        File temp = new File(path);
        if (!temp.exists() && !temp.mkdir()) {
            log.error("Error while creating directory : " + path);
        }
    }
}