001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.servicemix.eip.patterns;
018    
019    import java.util.concurrent.locks.Lock;
020    
021    import javax.jbi.management.DeploymentException;
022    import javax.jbi.messaging.ExchangeStatus;
023    import javax.jbi.messaging.InOnly;
024    import javax.jbi.messaging.MessageExchange;
025    import javax.jbi.messaging.NormalizedMessage;
026    import javax.jbi.messaging.RobustInOnly;
027    
028    import org.apache.servicemix.eip.EIPEndpoint;
029    import org.apache.servicemix.eip.support.ExchangeTarget;
030    import org.apache.servicemix.common.util.MessageUtil;
031    
032    /**
033     * The StaticRecipientList component will forward an input In-Only or Robust-In-Only
034     * exchange to a list of known recipients.
035     * This component implements the  
036     * <a href="http://www.enterpriseintegrationpatterns.com/RecipientList.html">Recipient List</a> 
037     * pattern, with the limitation that the recipient list is static.
038     * 
039     * @author gnodet
040     * @version $Revision: 376451 $
041     * @org.apache.xbean.XBean element="static-recipient-list"
042     *                  description="A static Recipient List"
043     */
044    public class StaticRecipientList extends EIPEndpoint {
045    
046        public static final String RECIPIENT_LIST_COUNT = "org.apache.servicemix.eip.recipientList.count";
047        public static final String RECIPIENT_LIST_INDEX = "org.apache.servicemix.eip.recipientList.index";
048        public static final String RECIPIENT_LIST_CORRID = "org.apache.servicemix.eip.recipientList.corrid";
049    
050        /**
051         * List of recipients
052         */
053        private ExchangeTarget[] recipients;
054        /**
055         * Indicates if faults and errors from recipients should be sent
056         * back to the consumer.  In such a case, only the first fault or
057         * error received will be reported.
058         * Note that if the consumer is synchronous, it will be blocked
059         * until all recipients successfully acked the exchange, or
060         * a fault or error is reported, and the exchange will be kept in the
061         * store for recovery. 
062         */
063        private boolean reportErrors;
064        /**
065         * The correlation property used by this component
066         */
067        //private String correlation;
068    
069        /**
070         * @return Returns the recipients.
071         */
072        public ExchangeTarget[] getRecipients() {
073            return recipients;
074        }
075    
076        /**
077         * @param recipients The recipients to set.
078         */
079        public void setRecipients(ExchangeTarget[] recipients) {
080            this.recipients = recipients;
081        }
082    
083        /**
084         * @return Returns the reportErrors.
085         */
086        public boolean isReportErrors() {
087            return reportErrors;
088        }
089    
090        /**
091         * @param reportErrors The reportErrors to set.
092         */
093        public void setReportErrors(boolean reportErrors) {
094            this.reportErrors = reportErrors;
095        }
096    
097        /* (non-Javadoc)
098         * @see org.apache.servicemix.eip.EIPEndpoint#validate()
099         */
100        public void validate() throws DeploymentException {
101            super.validate();
102            // Check recipients
103            if (recipients == null || recipients.length == 0) {
104                throw new IllegalArgumentException("recipients should contain at least one ExchangeTarget");
105            }
106            // Create correlation property
107            //correlation = "StaticRecipientList.Correlation." + getService() + "." + getEndpoint();
108        }
109        
110        /* (non-Javadoc)
111         * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
112         */
113        protected void processSync(MessageExchange exchange) throws Exception {
114            if (!(exchange instanceof InOnly)
115                && !(exchange instanceof RobustInOnly)) {
116                fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
117                return;
118            }
119            NormalizedMessage in = MessageUtil.copyIn(exchange);
120            for (int i = 0; i < recipients.length; i++) {
121                MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
122                recipients[i].configureTarget(me, getContext());
123                in.setProperty(RECIPIENT_LIST_COUNT, new Integer(recipients.length));
124                in.setProperty(RECIPIENT_LIST_INDEX, new Integer(i));
125                in.setProperty(RECIPIENT_LIST_CORRID, exchange.getExchangeId());
126                MessageUtil.transferToIn(in, me);
127                sendSync(me);
128                if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) {
129                    fail(exchange, me.getError());
130                    return;
131                } else if (me.getFault() != null && reportErrors) {
132                    MessageUtil.transferFaultToFault(me, exchange);
133                    sendSync(exchange);
134                    done(me);
135                    return;
136                }
137            }
138            done(exchange);
139        }
140    
141        /* (non-Javadoc)
142         * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
143         */
144        protected void processAsync(MessageExchange exchange) throws Exception {
145            if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
146                String corrId = (String) exchange.getMessage("in").getProperty(RECIPIENT_LIST_CORRID);
147                int count = (Integer) exchange.getMessage("in").getProperty(RECIPIENT_LIST_COUNT);
148                Lock lock = lockManager.getLock(corrId);
149                lock.lock();
150                try {
151                    Integer acks = (Integer) store.load(corrId + ".acks");
152                    if (exchange.getStatus() == ExchangeStatus.DONE) {
153                        // If the acks integer is not here anymore, the message response has been sent already
154                        if (acks != null) {
155                            if (acks + 1 >= count) {
156                                MessageExchange me = (MessageExchange) store.load(corrId);
157                                done(me);
158                            } else {
159                                store.store(corrId + ".acks", Integer.valueOf(acks + 1));
160                            }
161                        }
162                    } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
163                        // If the acks integer is not here anymore, the message response has been sent already
164                        if (acks != null) {
165                            if (reportErrors) {
166                                MessageExchange me = (MessageExchange) store.load(corrId);
167                                fail(me, exchange.getError());
168                            } else  if (acks + 1 >= count) {
169                                MessageExchange me = (MessageExchange) store.load(corrId);
170                                done(me);
171                            } else {
172                                store.store(corrId + ".acks", Integer.valueOf(acks + 1));
173                            }
174                        }
175                    } else if (exchange.getFault() != null) {
176                        // If the acks integer is not here anymore, the message response has been sent already
177                        if (acks != null) {
178                            if (reportErrors) {
179                                MessageExchange me = (MessageExchange) store.load(corrId);
180                                MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
181                                send(me);
182                                done(exchange);
183                            } else  if (acks + 1 >= count) {
184                                MessageExchange me = (MessageExchange) store.load(corrId);
185                                done(me);
186                            } else {
187                                store.store(corrId + ".acks", Integer.valueOf(acks + 1));
188                            }
189                        } else {
190                            done(exchange);
191                        }
192                    }
193                } finally {
194                    lock.unlock();
195                }
196            } else {
197                if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
198                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
199                } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
200                    store.store(exchange.getExchangeId(), exchange);
201                    store.store(exchange.getExchangeId() + ".acks", Integer.valueOf(0));
202                    NormalizedMessage in = MessageUtil.copyIn(exchange);
203                    for (int i = 0; i < recipients.length; i++) {
204                        MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
205                        recipients[i].configureTarget(me, getContext());
206                        in.setProperty(RECIPIENT_LIST_COUNT, new Integer(recipients.length));
207                        in.setProperty(RECIPIENT_LIST_INDEX, new Integer(i));
208                        in.setProperty(RECIPIENT_LIST_CORRID, exchange.getExchangeId());
209                        MessageUtil.transferToIn(in, me);
210                        send(me);
211                    }
212                }
213            }
214        }
215    
216    }