package org.wso2.carbon.mediator.bam;

import java.util.ArrayList;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.wso2.carbon.base.ServerConfiguration;
import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.agent.thrift.lb.DataPublisherHolder;
import org.wso2.carbon.databridge.agent.thrift.lb.LoadBalancingDataPublisher;
import org.wso2.carbon.databridge.agent.thrift.lb.ReceiverGroup;
import org.wso2.carbon.databridge.agent.thrift.util.DataPublisherUtil;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.mediator.bam.builders.CorrelationDataBuilder;
import org.wso2.carbon.mediator.bam.builders.MetaDataBuilder;
import org.wso2.carbon.mediator.bam.builders.PayloadDataBuilder;
import org.wso2.carbon.mediator.bam.config.BamMediatorException;
import org.wso2.carbon.mediator.bam.config.BamServerConfig;
import org.wso2.carbon.mediator.bam.config.stream.StreamConfiguration;

/* loaded from: input_file:lib/org.wso2.carbon.mediator.bam_4.0.5.jar:org/wso2/carbon/mediator/bam/Stream.class */
public class Stream {
    private static final Log log = LogFactory.getLog(Stream.class);
    public static final String ENABLE_MEDIATION_STATS = "EnableMediationStats";
    public static final String CLOUD_DEPLOYMENT_PROP = "IsCloudDeployment";
    public static final String SERVER_CONFIG_BAM_URL = "BamServerURL";
    public static final String DEFAULT_BAM_SERVER_URL = "tcp://127.0.0.1:7611";
    private AsyncDataPublisher asyncDataPublisher;
    private BamServerConfig bamServerConfig;
    private StreamConfiguration streamConfiguration;
    private StreamDefinitionBuilder streamDefinitionBuilder = new StreamDefinitionBuilder();
    private LoadBalancingDataPublisher loadBalancingDataPublisher = null;
    private boolean isPublisherCreated = false;
    private PayloadDataBuilder payloadDataBuilder = new PayloadDataBuilder();
    private MetaDataBuilder metaDataBuilder = new MetaDataBuilder();
    private CorrelationDataBuilder correlationDataBuilder = new CorrelationDataBuilder();

    public void sendEvents(MessageContext messageContext) throws BamMediatorException {
        new ActivityIDSetter().setActivityIdInSOAPHeader(messageContext);
        try {
            if (!this.isPublisherCreated) {
                initializeDataPublisher(this);
                this.isPublisherCreated = true;
            }
            publishEvent(messageContext);
        } catch (BamMediatorException e) {
            String str = "Problem occurred while logging events in the BAM Mediator. " + e.getMessage();
            log.error(str, e);
            throw new BamMediatorException(str, e);
        }
    }

    private static synchronized void initializeDataPublisher(Stream stream) throws BamMediatorException {
        try {
            if (!stream.isPublisherCreated) {
                stream.createDataPublisher();
                stream.setStreamDefinitionToDataPublisher();
                stream.isPublisherCreated = true;
            }
        } catch (BamMediatorException e) {
            String str = "Problem initializing the Data Publisher or Stream Definition. " + e.getMessage();
            log.error(str, e);
            throw new BamMediatorException(str, e);
        }
    }

    private void createDataPublisher() throws BamMediatorException {
        if (isCloudDeployment()) {
            this.asyncDataPublisher = new AsyncDataPublisher(getServerConfigBAMServerURL(), this.bamServerConfig.getUsername(), this.bamServerConfig.getPassword());
        } else if (this.bamServerConfig.isLoadbalanced()) {
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = DataPublisherUtil.getReceiverGroups(this.bamServerConfig.getUrlSet()).iterator();
            while (it.hasNext()) {
                String next = it.next();
                ArrayList arrayList2 = new ArrayList();
                for (String str : next.split(",")) {
                    arrayList2.add(new DataPublisherHolder(null, str.trim(), this.bamServerConfig.getUsername(), this.bamServerConfig.getPassword()));
                }
                arrayList.add(new ReceiverGroup(arrayList2));
            }
            this.loadBalancingDataPublisher = new LoadBalancingDataPublisher(arrayList);
        } else if (this.bamServerConfig.isSecure()) {
            this.asyncDataPublisher = new AsyncDataPublisher("ssl://" + this.bamServerConfig.getIp() + ":" + this.bamServerConfig.getAuthenticationPort(), "ssl://" + this.bamServerConfig.getIp() + ":" + this.bamServerConfig.getAuthenticationPort(), this.bamServerConfig.getUsername(), this.bamServerConfig.getPassword());
        } else {
            this.asyncDataPublisher = new AsyncDataPublisher("ssl://" + this.bamServerConfig.getIp() + ":" + this.bamServerConfig.getAuthenticationPort(), "tcp://" + this.bamServerConfig.getIp() + ":" + this.bamServerConfig.getReceiverPort(), this.bamServerConfig.getUsername(), this.bamServerConfig.getPassword());
        }
        log.info("Data Publisher Created.");
    }

    private String getServerConfigBAMServerURL() {
        String[] properties = ServerConfiguration.getInstance().getProperties(SERVER_CONFIG_BAM_URL);
        return null != properties ? properties[properties.length - 1] : DEFAULT_BAM_SERVER_URL;
    }

    private boolean isCloudDeployment() {
        String[] properties = ServerConfiguration.getInstance().getProperties("IsCloudDeployment");
        return null != properties && Boolean.parseBoolean(properties[properties.length - 1]);
    }

    private void setStreamDefinitionToDataPublisher() throws BamMediatorException {
        try {
            StreamDefinition buildStreamDefinition = this.streamDefinitionBuilder.buildStreamDefinition(this.streamConfiguration);
            if (this.bamServerConfig.isLoadbalanced()) {
                this.loadBalancingDataPublisher.addStreamDefinition(buildStreamDefinition);
            } else {
                this.asyncDataPublisher.addStreamDefinition(buildStreamDefinition);
            }
        } catch (BamMediatorException e) {
            String str = "Error while creating the Asynchronous/LoadBalancing Data Publisheror while creating the Stream Definition. " + e.getMessage();
            log.error(str, e);
            throw new BamMediatorException(str, e);
        }
    }

    private void publishEvent(MessageContext messageContext) throws BamMediatorException {
        org.apache.axis2.context.MessageContext axis2MessageContext = ((Axis2MessageContext) messageContext).getAxis2MessageContext();
        try {
            Object[] createMetadata = this.metaDataBuilder.createMetadata(messageContext, axis2MessageContext.getConfigurationContext().getAxisConfiguration());
            Object[] createCorrelationData = this.correlationDataBuilder.createCorrelationData(messageContext);
            Object[] createPayloadData = this.payloadDataBuilder.createPayloadData(messageContext, axis2MessageContext, this.streamConfiguration);
            if (this.bamServerConfig.isLoadbalanced()) {
                this.loadBalancingDataPublisher.publish(this.streamConfiguration.getName(), this.streamConfiguration.getVersion(), createMetadata, createCorrelationData, createPayloadData);
            } else {
                if (!this.asyncDataPublisher.canPublish()) {
                    this.asyncDataPublisher.reconnect();
                }
                this.asyncDataPublisher.publish(this.streamConfiguration.getName(), this.streamConfiguration.getVersion(), createMetadata, createCorrelationData, createPayloadData);
            }
        } catch (AgentException e) {
            String str = "Agent error occurred while sending the event. " + e.getMessage();
            log.error(str, e);
            throw new BamMediatorException(str, e);
        } catch (Exception e2) {
            String str2 = "Error occurred while sending the event. " + e2.getMessage();
            log.error(str2, e2);
            throw new BamMediatorException(str2, e2);
        }
    }

    public void setBamServerConfig(BamServerConfig bamServerConfig) {
        this.bamServerConfig = bamServerConfig;
    }

    public void setStreamConfiguration(StreamConfiguration streamConfiguration) {
        this.streamConfiguration = streamConfiguration;
    }

    public StreamConfiguration getStreamConfiguration() {
        return this.streamConfiguration;
    }
}
