001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.servicemix.eip.patterns;
018
019 import java.util.concurrent.locks.Lock;
020
021 import javax.jbi.management.DeploymentException;
022 import javax.jbi.messaging.ExchangeStatus;
023 import javax.jbi.messaging.InOnly;
024 import javax.jbi.messaging.MessageExchange;
025 import javax.jbi.messaging.NormalizedMessage;
026 import javax.jbi.messaging.RobustInOnly;
027
028 import org.apache.servicemix.eip.EIPEndpoint;
029 import org.apache.servicemix.eip.support.ExchangeTarget;
030 import org.apache.servicemix.common.util.MessageUtil;
031
032 /**
033 * The StaticRecipientList component will forward an input In-Only or Robust-In-Only
034 * exchange to a list of known recipients.
035 * This component implements the
036 * <a href="http://www.enterpriseintegrationpatterns.com/RecipientList.html">Recipient List</a>
037 * pattern, with the limitation that the recipient list is static.
038 *
039 * @author gnodet
040 * @version $Revision: 376451 $
041 * @org.apache.xbean.XBean element="static-recipient-list"
042 * description="A static Recipient List"
043 */
044 public class StaticRecipientList extends EIPEndpoint {
045
046 public static final String RECIPIENT_LIST_COUNT = "org.apache.servicemix.eip.recipientList.count";
047 public static final String RECIPIENT_LIST_INDEX = "org.apache.servicemix.eip.recipientList.index";
048 public static final String RECIPIENT_LIST_CORRID = "org.apache.servicemix.eip.recipientList.corrid";
049
050 /**
051 * List of recipients
052 */
053 private ExchangeTarget[] recipients;
054 /**
055 * Indicates if faults and errors from recipients should be sent
056 * back to the consumer. In such a case, only the first fault or
057 * error received will be reported.
058 * Note that if the consumer is synchronous, it will be blocked
059 * until all recipients successfully acked the exchange, or
060 * a fault or error is reported, and the exchange will be kept in the
061 * store for recovery.
062 */
063 private boolean reportErrors;
064 /**
065 * The correlation property used by this component
066 */
067 //private String correlation;
068
069 /**
070 * @return Returns the recipients.
071 */
072 public ExchangeTarget[] getRecipients() {
073 return recipients;
074 }
075
076 /**
077 * @param recipients The recipients to set.
078 */
079 public void setRecipients(ExchangeTarget[] recipients) {
080 this.recipients = recipients;
081 }
082
083 /**
084 * @return Returns the reportErrors.
085 */
086 public boolean isReportErrors() {
087 return reportErrors;
088 }
089
090 /**
091 * @param reportErrors The reportErrors to set.
092 */
093 public void setReportErrors(boolean reportErrors) {
094 this.reportErrors = reportErrors;
095 }
096
097 /* (non-Javadoc)
098 * @see org.apache.servicemix.eip.EIPEndpoint#validate()
099 */
100 public void validate() throws DeploymentException {
101 super.validate();
102 // Check recipients
103 if (recipients == null || recipients.length == 0) {
104 throw new IllegalArgumentException("recipients should contain at least one ExchangeTarget");
105 }
106 // Create correlation property
107 //correlation = "StaticRecipientList.Correlation." + getService() + "." + getEndpoint();
108 }
109
110 /* (non-Javadoc)
111 * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
112 */
113 protected void processSync(MessageExchange exchange) throws Exception {
114 if (!(exchange instanceof InOnly)
115 && !(exchange instanceof RobustInOnly)) {
116 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
117 return;
118 }
119 NormalizedMessage in = MessageUtil.copyIn(exchange);
120 for (int i = 0; i < recipients.length; i++) {
121 MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
122 recipients[i].configureTarget(me, getContext());
123 in.setProperty(RECIPIENT_LIST_COUNT, new Integer(recipients.length));
124 in.setProperty(RECIPIENT_LIST_INDEX, new Integer(i));
125 in.setProperty(RECIPIENT_LIST_CORRID, exchange.getExchangeId());
126 MessageUtil.transferToIn(in, me);
127 sendSync(me);
128 if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) {
129 fail(exchange, me.getError());
130 return;
131 } else if (me.getFault() != null && reportErrors) {
132 MessageUtil.transferFaultToFault(me, exchange);
133 sendSync(exchange);
134 done(me);
135 return;
136 }
137 }
138 done(exchange);
139 }
140
141 /* (non-Javadoc)
142 * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
143 */
144 protected void processAsync(MessageExchange exchange) throws Exception {
145 if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
146 String corrId = (String) exchange.getMessage("in").getProperty(RECIPIENT_LIST_CORRID);
147 int count = (Integer) exchange.getMessage("in").getProperty(RECIPIENT_LIST_COUNT);
148 Lock lock = lockManager.getLock(corrId);
149 lock.lock();
150 try {
151 Integer acks = (Integer) store.load(corrId + ".acks");
152 if (exchange.getStatus() == ExchangeStatus.DONE) {
153 // If the acks integer is not here anymore, the message response has been sent already
154 if (acks != null) {
155 if (acks + 1 >= count) {
156 MessageExchange me = (MessageExchange) store.load(corrId);
157 done(me);
158 } else {
159 store.store(corrId + ".acks", Integer.valueOf(acks + 1));
160 }
161 }
162 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
163 // If the acks integer is not here anymore, the message response has been sent already
164 if (acks != null) {
165 if (reportErrors) {
166 MessageExchange me = (MessageExchange) store.load(corrId);
167 fail(me, exchange.getError());
168 } else if (acks + 1 >= count) {
169 MessageExchange me = (MessageExchange) store.load(corrId);
170 done(me);
171 } else {
172 store.store(corrId + ".acks", Integer.valueOf(acks + 1));
173 }
174 }
175 } else if (exchange.getFault() != null) {
176 // If the acks integer is not here anymore, the message response has been sent already
177 if (acks != null) {
178 if (reportErrors) {
179 MessageExchange me = (MessageExchange) store.load(corrId);
180 MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
181 send(me);
182 done(exchange);
183 } else if (acks + 1 >= count) {
184 MessageExchange me = (MessageExchange) store.load(corrId);
185 done(me);
186 } else {
187 store.store(corrId + ".acks", Integer.valueOf(acks + 1));
188 }
189 } else {
190 done(exchange);
191 }
192 }
193 } finally {
194 lock.unlock();
195 }
196 } else {
197 if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
198 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
199 } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
200 store.store(exchange.getExchangeId(), exchange);
201 store.store(exchange.getExchangeId() + ".acks", Integer.valueOf(0));
202 NormalizedMessage in = MessageUtil.copyIn(exchange);
203 for (int i = 0; i < recipients.length; i++) {
204 MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
205 recipients[i].configureTarget(me, getContext());
206 in.setProperty(RECIPIENT_LIST_COUNT, new Integer(recipients.length));
207 in.setProperty(RECIPIENT_LIST_INDEX, new Integer(i));
208 in.setProperty(RECIPIENT_LIST_CORRID, exchange.getExchangeId());
209 MessageUtil.transferToIn(in, me);
210 send(me);
211 }
212 }
213 }
214 }
215
216 }