001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.patterns;
018    
019    import javax.jbi.management.DeploymentException;
020    import javax.jbi.messaging.ExchangeStatus;
021    import javax.jbi.messaging.Fault;
022    import javax.jbi.messaging.InOnly;
023    import javax.jbi.messaging.MessageExchange;
024    import javax.jbi.messaging.NormalizedMessage;
025    import javax.jbi.messaging.RobustInOnly;
026    
027    import org.apache.servicemix.common.util.MessageUtil;
028    import org.apache.servicemix.eip.EIPEndpoint;
029    import org.apache.servicemix.eip.support.ExchangeTarget;
030    import org.apache.servicemix.eip.support.Predicate;
031    
032    /**
033     * MessageFilter allows filtering incoming JBI exchanges.
034     * This component implements the  
035     * <a href="http://www.enterpriseintegrationpatterns.com/Filter.html">Message Filter</a> 
036     * pattern.
037     *  
038     * @author gnodet
039     * @version $Revision: 376451 $
040     * @org.apache.xbean.XBean element="message-filter"
041     *                  description="A Message Filter"
042     */
043    public class MessageFilter extends EIPEndpoint {
044    
045        /**
046         * The main target destination which will receive the exchange
047         */
048        private ExchangeTarget target;
049        /**
050         * The filter to use on incoming messages
051         */
052        private Predicate filter;
053        /**
054         * The correlation property used by this component
055         */
056        //private String correlation;
057        /**
058         * Indicates if faults and errors from recipients should be sent
059         * back to the consumer.  In such a case, only the first fault or
060         * error received will be reported.
061         * Note that if the consumer is synchronous, it will be blocked
062         * until all recipients successfully acked the exchange, or
063         * a fault or error is reported, and the exchange will be kept in the
064         * store for recovery. 
065         */
066        private boolean reportErrors;
067        
068        /**
069         * @return Returns the target.
070         */
071        public ExchangeTarget getTarget() {
072            return target;
073        }
074    
075        /**
076         * @param target The target to set.
077         */
078        public void setTarget(ExchangeTarget target) {
079            this.target = target;
080        }
081    
082        /**
083         * @return Returns the filter.
084         */
085        public Predicate getFilter() {
086            return filter;
087        }
088    
089        /**
090         * @param filter The filter to set.
091         */
092        public void setFilter(Predicate filter) {
093            this.filter = filter;
094        }
095    
096        /**
097         * @return Returns the reportErrors.
098         */
099        public boolean isReportErrors() {
100            return reportErrors;
101        }
102    
103        /**
104         * @param reportErrors The reportErrors to set.
105         */
106        public void setReportErrors(boolean reportErrors) {
107            this.reportErrors = reportErrors;
108        }
109    
110        /* (non-Javadoc)
111         * @see org.apache.servicemix.eip.EIPEndpoint#validate()
112         */
113        public void validate() throws DeploymentException {
114            super.validate();
115            // Check target
116            if (target == null) {
117                throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
118            }
119            // Check filter
120            if (filter == null) {
121                throw new IllegalArgumentException("filter property should be set");
122            }
123            // Create correlation property
124            //correlation = "MessageFilter.Correlation." + getService() + "." + getEndpoint();
125        }
126    
127        /* (non-Javadoc)
128         * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
129         */
130        protected void processSync(MessageExchange exchange) throws Exception {
131            if (!(exchange instanceof InOnly)
132                && !(exchange instanceof RobustInOnly)) {
133                fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
134            } else {
135                NormalizedMessage in = MessageUtil.copyIn(exchange);
136                MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
137                target.configureTarget(me, getContext());
138                MessageUtil.transferToIn(in, me);
139                if (filter.matches(me)) {
140                    sendSync(me);
141                    if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) {
142                        fail(exchange, me.getError());
143                    } else if (me.getStatus() == ExchangeStatus.DONE) {
144                        done(exchange);
145                    } else if (me.getFault() != null && reportErrors) {
146                        Fault fault = MessageUtil.copyFault(me);
147                        done(me);
148                        MessageUtil.transferToFault(fault, exchange);
149                        sendSync(exchange);
150                    }
151                } else {
152                    done(exchange);
153                }
154            }
155        }
156    
157        /* (non-Javadoc)
158         * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
159         */
160        protected void processAsync(MessageExchange exchange) throws Exception {
161            // If we need to report errors, the behavior is really different,
162            // as we need to keep the incoming exchange in the store until
163            // all acks have been received
164            if (reportErrors) {
165                // TODO: implement this
166                throw new UnsupportedOperationException("Not implemented");
167            // We are in a simple fire-and-forget behaviour.
168            // This implementation is really efficient as we do not use
169            // the store at all.
170            } else {
171                if (exchange.getStatus() == ExchangeStatus.DONE) {
172                    return;
173                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
174                    return;
175                } else if (!(exchange instanceof InOnly)
176                           && !(exchange instanceof RobustInOnly)) {
177                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
178                } else if (exchange.getFault() != null) {
179                    done(exchange);
180                } else {
181                    NormalizedMessage in = MessageUtil.copyIn(exchange);
182                    MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
183                    target.configureTarget(me, getContext());
184                    MessageUtil.transferToIn(in, me);
185                    if (filter.matches(me)) {
186                        send(me);
187                    }
188                    done(exchange);
189                }
190            }
191        }
192    
193    }