The Data Agent API

Introduction

The Agent API allows a user to write a custom data agent to publish events into BAM. The pre-built data agents that can be installed into WSO2 products also use the Agent API under the hood. The pre-built data agents have the following properties:

  1. Non-blocking
  2. Asynchronous
  3. High performance
When you use the data agent API, all these features are automarically inherited by the custom data agent as well.

A step by step sample is shown below on the usage of this API:

First of all, an event should be created to send over. This will be a typical map object with keys and values corresponding to a String key and a Bytebuffer value.

                private static Map<String, ByteBuffer> createEventDataMap() {
                    Map<String, ByteBuffer> eventDataMap = new HashMap<String,
                            ByteBuffer>();
                    eventDataMap.put("eventKey1", ByteBuffer.wrap("eventValue1".getBytes()));
                    eventDataMap.put("eventKey2", ByteBuffer.wrap("eventValue2".getBytes()));
                    eventDataMap.put("eventKey3", ByteBuffer.wrap("eventnValue3".getBytes()));
                    return eventDataMap;
                }
            

Similarly, you can arrange your event into three logical groups. Correlation data, Meta data and Event data. Generally, this separation is useful to classify the data that constructs an event, and think broadly about the event. For example, the meta data such as the IP address of the event will be useful to capture but does not fit in with the main event data. So, it is better to classify it as meta data. So, we create the correlation data and the meta data as well and construct our event object.


                private static Map<String, ByteBuffer> createMetaDataMap() {
                    Map<String, ByteBuffer> metaDataMap = new HashMap<String,
                            ByteBuffer>();
                    metaDataMap.put("metaKey1", ByteBuffer.wrap("metaValue1".getBytes()));
                    metaDataMap.put("metaKey2", ByteBuffer.wrap("metaValue2".getBytes()));
                    metaDataMap.put("metaKey3", ByteBuffer.wrap("metanValue3".getBytes()));
                    return metaDataMap;
                }

                private static Map<String, ByteBuffer> createcorrelationDataMap() {
                    Map<String, ByteBuffer> correlationDataMap = new HashMap<String,
                            ByteBuffer>();
                    correlationDataMap.put("correlationKey1", ByteBuffer.wrap("correlationValue1".getBytes()));
                    correlationDataMap.put("correlationKey2", ByteBuffer.wrap("correlationValue2".getBytes()));
                    correlationDataMap.put("correlationKey3", ByteBuffer.wrap("correlationnValue3".getBytes()));
                    return correlationDataMap;
                }
                Event myEvent = new Event();
                myEvent.setCorrelation(createCorrelationMap());
                myEvent.setEvent(createEventDataMap());
                myEvent.setMeta(createMetaDataMap());
            

Next, a list of events need to be constructed. If there is only one event, this may seem tedious, but it is particularly useful when a batch of events should be sent.

                List<Event> events = new ArrayList<Event>();
                events.add(myEvent);
            

Now, we need to construct an event receiver object, which needs to be populated with details of the transport used, receiver url and credentials. The code snippet below shows the default socket transport and port for Thrift based event publishing, default url assuming that the BAM server is bound to port 9443 on the local machine and default admin credentials, i.e. 'admin', 'admin'. These have to be changed appropriately if these default values are changed.

                EventReceiver eventReceiver = new EventReceiver();
                eventReceiver.setSocketTransportEnabled(true);
                eventReceiver.setUrl("https://localhost:9443/");
                eventReceiver.setUserName("admin");
                eventReceiver.setPassword("admin");
                eventReceiver.setPort(7611);

            

There is security built on top of the Thrift transport in the WSO2 BAM server. Therefore, trust store parameters need to be set. The https url set in the event receiver url, is used as the url to authenticate and establish a session. In the code snippet below, the trust store that is shipped with WSO2 BAM is used with its default password. These should be changed, if different trust stores are used.

                private static  void setTrustStoreParams() {
                    String trustStore = "../../repository/resources/security";
                    System.setProperty("javax.net.ssl.trustStore", trustStore + "/client-truststore.jks");
                    System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
                }
            

Now, the penultimate step before publishing the event is to initialize the agent object. It accepts an agent configuration, which has the following properties:

  1. taskQueueSize
  2. corePoolSize
  3. maxPoolSize
  4. eventQueueSize
  5. maxIdleConnections
  6. evictionTimePeriod
  7. minIdleTimeInPool
Explanation of each term is similar to the Thread Pool Executor present in Java.

Creating an agent configuration with a default constructor, creates an agent configuration with default values for the above properties.

           AgentConfiguration configuration = new AgentConfiguration();
           Agent agent = new Agent(configuration);
       

Now, to finally publish the event to the event receiver the publish method should be called. the publish method does the publishing of the event based on two parameters. The event receiver and the event list. Then this publishes the list of events to the respective event receiver to publish the event, we need to call the publish event.

           agent.publish(events, eventReceiver);

       

It is a best practice to shut down the agent cleanly, when ending the program that runs the data agent. This cleans out the event queue, shuts down the thread pools cleanly.

           agent.shutdown();