org.apache.servicemix.eip.support
Class AbstractAggregator

java.lang.Object
  extended by org.apache.servicemix.common.endpoints.AbstractEndpoint
      extended by org.apache.servicemix.common.endpoints.SimpleEndpoint
          extended by org.apache.servicemix.common.endpoints.ProviderEndpoint
              extended by org.apache.servicemix.eip.EIPEndpoint
                  extended by org.apache.servicemix.eip.support.AbstractAggregator
All Implemented Interfaces:
org.apache.servicemix.common.Endpoint
Direct Known Subclasses:
SplitAggregator

public abstract class AbstractAggregator
extends EIPEndpoint

Aggregator can be used to wait and combine several messages. This component implements the Aggregator pattern. Closed aggregations are being kept in a Store. By default, we will use a simple MemoryStore, but you can set your own StoreFactory to use other implementations. TODO: distributed lock manager TODO: persistent / transactional timer

Version:
$Revision: 376451 $
Author:
gnodet

Field Summary
 
Fields inherited from class org.apache.servicemix.eip.EIPEndpoint
lockManager, store, storeFactory, timerManager, wsdlExchangeTarget, wsdlResource
 
Fields inherited from class org.apache.servicemix.common.endpoints.AbstractEndpoint
definition, description, endpoint, interfaceName, logger, service, serviceUnit
 
Constructor Summary
AbstractAggregator()
           
 
Method Summary
protected abstract  boolean addMessage(Object aggregate, javax.jbi.messaging.NormalizedMessage message, javax.jbi.messaging.MessageExchange exchange)
          Add a newly received message to this aggregation
protected abstract  void buildAggregate(Object aggregate, javax.jbi.messaging.NormalizedMessage message, javax.jbi.messaging.MessageExchange exchange, boolean timeout)
          Fill the given JBI message with the aggregation result.
protected  void closeAggregation(String correlationId)
          Mark an aggregation as closed
protected abstract  Object createAggregation(String correlationID)
          Creates a new empty aggregation.
 org.apache.servicemix.store.StoreFactory getClosedAggregatesStoreFactory()
          Access the currently configured StoreFactory for storing closed aggregations
protected abstract  String getCorrelationID(javax.jbi.messaging.MessageExchange exchange, javax.jbi.messaging.NormalizedMessage message)
          Retrieve the correlation ID of the given exchange
 ExchangeTarget getTarget()
           
protected abstract  Date getTimeout(Object aggregate)
          Returns the date when the onTimeout method should be called if the aggregation is not completed yet, or null if the aggregation has no timeout.
protected  boolean isAggregationClosed(String correlationId)
          Check if the aggregation with the given correlation id is closed or not.
 boolean isCopyAttachments()
           
 boolean isCopyProperties()
           
 boolean isRescheduleTimeouts()
           
 boolean isSynchronous()
           
protected  void onTimeout(String processCorrelationId, String correlationId, org.apache.servicemix.timers.Timer timer)
           
 void process(javax.jbi.messaging.MessageExchange exchange)
           
protected  void processAsync(javax.jbi.messaging.MessageExchange exchange)
           
protected  void processSync(javax.jbi.messaging.MessageExchange exchange)
           
protected  void sendAggregate(String processCorrelationId, String correlationId, Object aggregation, boolean timeout, boolean sync)
           
 void setClosedAggregatesStoreFactory(org.apache.servicemix.store.StoreFactory closedAggregatesStoreFactory)
          Set a new StoreFactory for creating the Store to hold closed aggregations If it hasn't been set, a simple MemoryStoreFactory will be used by default.
 void setCopyAttachments(boolean copyAttachments)
           
 void setCopyProperties(boolean copyProperties)
           
 void setRescheduleTimeouts(boolean rescheduleTimeouts)
           
 void setSynchronous(boolean synchronous)
           
 void setTarget(ExchangeTarget target)
           
 void start()
           
 
Methods inherited from class org.apache.servicemix.eip.EIPEndpoint
chooseFirstEndpointWithDescriptor, copyAttachments, copyProperties, getDefinition, getDefinitionFromDescription, getDefinitionFromWsdlExchangeTarget, getDefinitionFromWsdlResource, getDescription, getDescriptionForExchangeTarget, getEndpointsForExchangeTarget, getLockManager, getStore, getStoreFactory, getTimerManager, getWsdlExchangeTarget, getWsdlResource, setLockManager, setStore, setStoreFactory, setTimerManager, setWsdlExchangeTarget, setWsdlResource, stop
 
Methods inherited from class org.apache.servicemix.common.endpoints.ProviderEndpoint
activate, deactivate, getRole, processInOnly, processInOut
 
Methods inherited from class org.apache.servicemix.common.endpoints.SimpleEndpoint
done, fail, getChannel, getContext, getExchangeFactory, send, sendSync
 
Methods inherited from class org.apache.servicemix.common.endpoints.AbstractEndpoint
getEndpoint, getInterfaceName, getKey, getService, getServiceUnit, isExchangeOkay, prepareExchange, setDefinition, setDescription, setEndpoint, setInterfaceName, setService, setServiceUnit, toString, validate
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Constructor Detail

AbstractAggregator

public AbstractAggregator()
Method Detail

isSynchronous

public boolean isSynchronous()
Returns:
the synchronous

setSynchronous

public void setSynchronous(boolean synchronous)
Parameters:
synchronous - the synchronous to set

isRescheduleTimeouts

public boolean isRescheduleTimeouts()
Returns:
the rescheduleTimeouts

setRescheduleTimeouts

public void setRescheduleTimeouts(boolean rescheduleTimeouts)
Parameters:
rescheduleTimeouts - the rescheduleTimeouts to set

getTarget

public ExchangeTarget getTarget()
Returns:
the target

setTarget

public void setTarget(ExchangeTarget target)
Parameters:
target - the target to set

isCopyProperties

public boolean isCopyProperties()

setCopyProperties

public void setCopyProperties(boolean copyProperties)

isCopyAttachments

public boolean isCopyAttachments()

setCopyAttachments

public void setCopyAttachments(boolean copyAttachments)

processSync

protected void processSync(javax.jbi.messaging.MessageExchange exchange)
                    throws Exception
Specified by:
processSync in class EIPEndpoint
Throws:
Exception

getClosedAggregatesStoreFactory

public org.apache.servicemix.store.StoreFactory getClosedAggregatesStoreFactory()
Access the currently configured StoreFactory for storing closed aggregations


setClosedAggregatesStoreFactory

public void setClosedAggregatesStoreFactory(org.apache.servicemix.store.StoreFactory closedAggregatesStoreFactory)
Set a new StoreFactory for creating the Store to hold closed aggregations If it hasn't been set, a simple MemoryStoreFactory will be used by default.

Parameters:
closedAggregatesStoreFactory -

processAsync

protected void processAsync(javax.jbi.messaging.MessageExchange exchange)
                     throws Exception
Specified by:
processAsync in class EIPEndpoint
Throws:
Exception

start

public void start()
           throws Exception
Specified by:
start in interface org.apache.servicemix.common.Endpoint
Overrides:
start in class EIPEndpoint
Throws:
Exception

process

public void process(javax.jbi.messaging.MessageExchange exchange)
             throws Exception
Specified by:
process in interface org.apache.servicemix.common.Endpoint
Overrides:
process in class EIPEndpoint
Throws:
Exception

sendAggregate

protected void sendAggregate(String processCorrelationId,
                             String correlationId,
                             Object aggregation,
                             boolean timeout,
                             boolean sync)
                      throws Exception
Throws:
Exception

onTimeout

protected void onTimeout(String processCorrelationId,
                         String correlationId,
                         org.apache.servicemix.timers.Timer timer)

isAggregationClosed

protected boolean isAggregationClosed(String correlationId)
                               throws Exception
Check if the aggregation with the given correlation id is closed or not. Called when the aggregation has not been found in the store.

Parameters:
correlationId -
Returns:
Throws:
Exception

closeAggregation

protected void closeAggregation(String correlationId)
                         throws Exception
Mark an aggregation as closed

Parameters:
correlationId -
Throws:
Exception

getCorrelationID

protected abstract String getCorrelationID(javax.jbi.messaging.MessageExchange exchange,
                                           javax.jbi.messaging.NormalizedMessage message)
                                    throws Exception
Retrieve the correlation ID of the given exchange

Parameters:
exchange -
message -
Returns:
the correlationID
Throws:
Exception

createAggregation

protected abstract Object createAggregation(String correlationID)
                                     throws Exception
Creates a new empty aggregation.

Parameters:
correlationID -
Returns:
a newly created aggregation
Throws:
Exception

getTimeout

protected abstract Date getTimeout(Object aggregate)
Returns the date when the onTimeout method should be called if the aggregation is not completed yet, or null if the aggregation has no timeout.

Parameters:
aggregate -
Returns:

addMessage

protected abstract boolean addMessage(Object aggregate,
                                      javax.jbi.messaging.NormalizedMessage message,
                                      javax.jbi.messaging.MessageExchange exchange)
                               throws Exception
Add a newly received message to this aggregation

Parameters:
aggregate -
message -
exchange -
Returns:
true if the aggregate id complete
Throws:
Exception

buildAggregate

protected abstract void buildAggregate(Object aggregate,
                                       javax.jbi.messaging.NormalizedMessage message,
                                       javax.jbi.messaging.MessageExchange exchange,
                                       boolean timeout)
                                throws Exception
Fill the given JBI message with the aggregation result.

Parameters:
aggregate -
message -
exchange -
timeout - false if the aggregation has completed or true if this aggregation has timed out
Throws:
Exception


Copyright © 2005-2008 Apache Software Foundation. All Rights Reserved.