001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.patterns;
018
019 import javax.jbi.management.DeploymentException;
020 import javax.jbi.messaging.ExchangeStatus;
021 import javax.jbi.messaging.Fault;
022 import javax.jbi.messaging.InOnly;
023 import javax.jbi.messaging.MessageExchange;
024 import javax.jbi.messaging.NormalizedMessage;
025 import javax.jbi.messaging.RobustInOnly;
026
027 import org.apache.servicemix.common.util.MessageUtil;
028 import org.apache.servicemix.eip.EIPEndpoint;
029 import org.apache.servicemix.eip.support.ExchangeTarget;
030 import org.apache.servicemix.eip.support.Predicate;
031
032 /**
033 * MessageFilter allows filtering incoming JBI exchanges.
034 * This component implements the
035 * <a href="http://www.enterpriseintegrationpatterns.com/Filter.html">Message Filter</a>
036 * pattern.
037 *
038 * @author gnodet
039 * @version $Revision: 376451 $
040 * @org.apache.xbean.XBean element="message-filter"
041 * description="A Message Filter"
042 */
043 public class MessageFilter extends EIPEndpoint {
044
045 /**
046 * The main target destination which will receive the exchange
047 */
048 private ExchangeTarget target;
049 /**
050 * The filter to use on incoming messages
051 */
052 private Predicate filter;
053 /**
054 * The correlation property used by this component
055 */
056 //private String correlation;
057 /**
058 * Indicates if faults and errors from recipients should be sent
059 * back to the consumer. In such a case, only the first fault or
060 * error received will be reported.
061 * Note that if the consumer is synchronous, it will be blocked
062 * until all recipients successfully acked the exchange, or
063 * a fault or error is reported, and the exchange will be kept in the
064 * store for recovery.
065 */
066 private boolean reportErrors;
067
068 /**
069 * @return Returns the target.
070 */
071 public ExchangeTarget getTarget() {
072 return target;
073 }
074
075 /**
076 * @param target The target to set.
077 */
078 public void setTarget(ExchangeTarget target) {
079 this.target = target;
080 }
081
082 /**
083 * @return Returns the filter.
084 */
085 public Predicate getFilter() {
086 return filter;
087 }
088
089 /**
090 * @param filter The filter to set.
091 */
092 public void setFilter(Predicate filter) {
093 this.filter = filter;
094 }
095
096 /**
097 * @return Returns the reportErrors.
098 */
099 public boolean isReportErrors() {
100 return reportErrors;
101 }
102
103 /**
104 * @param reportErrors The reportErrors to set.
105 */
106 public void setReportErrors(boolean reportErrors) {
107 this.reportErrors = reportErrors;
108 }
109
110 /* (non-Javadoc)
111 * @see org.apache.servicemix.eip.EIPEndpoint#validate()
112 */
113 public void validate() throws DeploymentException {
114 super.validate();
115 // Check target
116 if (target == null) {
117 throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
118 }
119 // Check filter
120 if (filter == null) {
121 throw new IllegalArgumentException("filter property should be set");
122 }
123 // Create correlation property
124 //correlation = "MessageFilter.Correlation." + getService() + "." + getEndpoint();
125 }
126
127 /* (non-Javadoc)
128 * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
129 */
130 protected void processSync(MessageExchange exchange) throws Exception {
131 if (!(exchange instanceof InOnly)
132 && !(exchange instanceof RobustInOnly)) {
133 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
134 } else {
135 NormalizedMessage in = MessageUtil.copyIn(exchange);
136 MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
137 target.configureTarget(me, getContext());
138 MessageUtil.transferToIn(in, me);
139 if (filter.matches(me)) {
140 sendSync(me);
141 if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) {
142 fail(exchange, me.getError());
143 } else if (me.getStatus() == ExchangeStatus.DONE) {
144 done(exchange);
145 } else if (me.getFault() != null && reportErrors) {
146 Fault fault = MessageUtil.copyFault(me);
147 done(me);
148 MessageUtil.transferToFault(fault, exchange);
149 sendSync(exchange);
150 }
151 } else {
152 done(exchange);
153 }
154 }
155 }
156
157 /* (non-Javadoc)
158 * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
159 */
160 protected void processAsync(MessageExchange exchange) throws Exception {
161 // If we need to report errors, the behavior is really different,
162 // as we need to keep the incoming exchange in the store until
163 // all acks have been received
164 if (reportErrors) {
165 // TODO: implement this
166 throw new UnsupportedOperationException("Not implemented");
167 // We are in a simple fire-and-forget behaviour.
168 // This implementation is really efficient as we do not use
169 // the store at all.
170 } else {
171 if (exchange.getStatus() == ExchangeStatus.DONE) {
172 return;
173 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
174 return;
175 } else if (!(exchange instanceof InOnly)
176 && !(exchange instanceof RobustInOnly)) {
177 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
178 } else if (exchange.getFault() != null) {
179 done(exchange);
180 } else {
181 NormalizedMessage in = MessageUtil.copyIn(exchange);
182 MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
183 target.configureTarget(me, getContext());
184 MessageUtil.transferToIn(in, me);
185 if (filter.matches(me)) {
186 send(me);
187 }
188 done(exchange);
189 }
190 }
191 }
192
193 }