package org.wso2.bam.integration.tests.reciever;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.axis2.client.Options;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.ConfigurationContextFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.wso2.carbon.analytics.hive.stub.HiveExecutionServiceStub;
import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.exception.AuthenticationException;
import org.wso2.carbon.databridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.StreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.TransportException;
import org.wso2.carbon.integration.framework.LoginLogoutUtil;
import org.wso2.carbon.integration.framework.utils.FrameworkSettings;

/* loaded from: input_file:org/wso2/bam/integration/tests/reciever/BAMReceiverVariableFieldsTestCase.class */
public class BAMReceiverVariableFieldsTestCase {
    private static final Log log = LogFactory.getLog(BAMReceiverVariableFieldsTestCase.class);
    private ThreadPoolExecutor threadPoolExecutor;
    private static final int NUMBER_OF_THREADS = 10;
    private static final long KEEP_ALIVE_TIME = 10;
    private static final String STREAM_NAME = "org.wso2.carbon.bam.variable.fields.test";
    private static final String VERSION = "1.0.0";
    private LoginLogoutUtil util = new LoginLogoutUtil();
    private static final String HIVE_SERVICE = "/services/HiveExecutionService";
    private HiveExecutionServiceStub hiveStub;

    /* loaded from: input_file:org/wso2/bam/integration/tests/reciever/BAMReceiverVariableFieldsTestCase$BAMDataPublisher.class */
    private class BAMDataPublisher implements Runnable {
        private final Log log;
        private int publisherId;
        private DataPublisher dataPublisher;
        private String streamName;
        private String version;
        private static final int NUMBER_EVENTS = 20;

        private BAMDataPublisher(int i, String str, String str2) throws AgentException, MalformedURLException, AuthenticationException, SocketException, TransportException {
            this.log = LogFactory.getLog(BAMDataPublisher.class);
            this.publisherId = i;
            this.streamName = str;
            this.version = str2;
            this.dataPublisher = new DataPublisher("tcp://" + getLocalHostAddress().getHostAddress() + ":7611", "admin", "admin");
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                String defineEventStream = defineEventStream();
                for (int i = 0; i < 4; i++) {
                    publishEvent(defineEventStream, new HashMap());
                    HashMap hashMap = new HashMap();
                    hashMap.put("key1", "value1");
                    publishEvent(defineEventStream, hashMap);
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put("message", "message1");
                    publishEvent(defineEventStream, hashMap2);
                    HashMap hashMap3 = new HashMap();
                    hashMap3.put("key1", "value1");
                    hashMap3.put("key2", "value2");
                    publishEvent(defineEventStream, hashMap3);
                    HashMap hashMap4 = new HashMap();
                    hashMap4.put("key1", "value1");
                    hashMap4.put("key2", "value2");
                    hashMap4.put("message", "message1");
                    publishEvent(defineEventStream, hashMap4);
                }
                Thread.sleep(1000L);
                this.dataPublisher.stop();
            } catch (AgentException e) {
                this.log.error(e.getErrorMessage(), e);
            } catch (InterruptedException e2) {
            } catch (DifferentStreamDefinitionAlreadyDefinedException e3) {
                this.log.error(e3.getErrorMessage(), e3);
            } catch (MalformedStreamDefinitionException e4) {
                this.log.error(e4.getErrorMessage(), e4);
            } catch (StreamDefinitionException e5) {
                this.log.error(e5.getErrorMessage(), e5);
            }
        }

        private String defineEventStream() throws AgentException, StreamDefinitionException, MalformedStreamDefinitionException, DifferentStreamDefinitionAlreadyDefinedException {
            return this.dataPublisher.defineStream("{  'name':'" + this.streamName + "',  'version':'" + this.version + "',  'nickName': 'Integration_test',  'description': 'Integration tests events',  'metaData':[          {'name':'clientType','type':'STRING'}  ],  'payloadData':[          {'name':'message','type':'STRING'},          {'name':'publisherId','type':'INT'}  ]}");
        }

        private void publishEvent(String str, Map map) throws AgentException {
            this.dataPublisher.publish(new Event(str, System.currentTimeMillis(), new Object[]{"external"}, (Object[]) null, new Object[]{"Integration test Message: " + this.publisherId, Integer.valueOf(this.publisherId)}, map));
        }

        private InetAddress getLocalHostAddress() throws SocketException {
            Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
            while (networkInterfaces.hasMoreElements()) {
                Enumeration<InetAddress> inetAddresses = networkInterfaces.nextElement().getInetAddresses();
                while (inetAddresses.hasMoreElements()) {
                    InetAddress nextElement = inetAddresses.nextElement();
                    if ((nextElement instanceof Inet4Address) && !nextElement.isLoopbackAddress()) {
                        return nextElement;
                    }
                }
            }
            return null;
        }
    }

    private void init() {
        this.threadPoolExecutor = new ThreadPoolExecutor(NUMBER_OF_THREADS, NUMBER_OF_THREADS, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue());
    }

    @Test(groups = {"wso2.bam"})
    public void publishConcurrentEvents() throws Exception {
        init();
        for (int i = 0; i < NUMBER_OF_THREADS; i++) {
            this.threadPoolExecutor.execute(new BAMDataPublisher(i + 1, STREAM_NAME, "1.0.0"));
        }
        this.threadPoolExecutor.shutdown();
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
        }
        checkPublishedDataInCassandra();
    }

    private void checkPublishedDataInCassandra() throws Exception {
        initializeHiveStub();
        this.hiveStub.executeHiveScript((String) null, "CREATE EXTERNAL TABLE IF NOT EXISTS BAMVarFieldTest (eventId STRING, message STRING, publisherId INT) STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( \"cassandra.host\" = \"127.0.0.1\" , \"cassandra.port\" = \"9160\" , \"cassandra.ks.name\" = \"EVENT_KS\" , \"cassandra.ks.username\" = \"admin\" , \"cassandra.ks.password\" = \"admin\" , \"cassandra.cf.name\" = \"org_wso2_carbon_bam_variable_fields_test\" , \"cassandra.columns.mapping\" = \":key,payload_message, payload_publisherId\" );");
        HiveExecutionServiceStub.QueryResult[] executeHiveScript = this.hiveStub.executeHiveScript((String) null, "SELECT Count(1) From BAMVarFieldTest");
        Assert.assertTrue(null != executeHiveScript || executeHiveScript.length == 0, "No results are returned to published test events");
        HiveExecutionServiceStub.QueryResultRow[] resultRows = executeHiveScript[0].getResultRows();
        Assert.assertTrue(null != resultRows || resultRows.length == 0, "No results are returned to published test events");
        String[] columnValues = resultRows[0].getColumnValues();
        Assert.assertTrue(null != columnValues || columnValues.length == 0, "No results are returned to published test events");
        Assert.assertTrue(Integer.parseInt(columnValues[0]) == 200, "Actual Events sent and the saved events are different");
    }

    private void initializeHiveStub() throws Exception {
        ConfigurationContext createConfigurationContextFromFileSystem = ConfigurationContextFactory.createConfigurationContextFromFileSystem((String) null);
        String login = this.util.login();
        this.hiveStub = new HiveExecutionServiceStub(createConfigurationContextFromFileSystem, "https://" + FrameworkSettings.HOST_NAME + ":" + FrameworkSettings.HTTPS_PORT + HIVE_SERVICE);
        Options options = this.hiveStub._getServiceClient().getOptions();
        options.setTimeOutInMilliSeconds(600000L);
        options.setManageSession(true);
        options.setProperty("Cookie", login);
    }
}
