View Javadoc

1   /***
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  package org.codehaus.activemq.broker.impl;
20  import java.io.IOException;
21  import java.util.HashSet;
22  import java.util.Iterator;
23  import java.util.Set;
24  import javax.jms.ExceptionListener;
25  import javax.jms.JMSException;
26  import javax.transaction.xa.XAException;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.codehaus.activemq.broker.BrokerClient;
30  import org.codehaus.activemq.broker.BrokerConnector;
31  import org.codehaus.activemq.message.ActiveMQMessage;
32  import org.codehaus.activemq.message.ActiveMQXid;
33  import org.codehaus.activemq.message.BrokerInfo;
34  import org.codehaus.activemq.message.CapacityInfo;
35  import org.codehaus.activemq.message.ConnectionInfo;
36  import org.codehaus.activemq.message.ConsumerInfo;
37  import org.codehaus.activemq.message.DurableUnsubscribe;
38  import org.codehaus.activemq.message.IntResponseReceipt;
39  import org.codehaus.activemq.message.MessageAck;
40  import org.codehaus.activemq.message.Packet;
41  import org.codehaus.activemq.message.PacketListener;
42  import org.codehaus.activemq.message.ProducerInfo;
43  import org.codehaus.activemq.message.Receipt;
44  import org.codehaus.activemq.message.ResponseReceipt;
45  import org.codehaus.activemq.message.SessionInfo;
46  import org.codehaus.activemq.message.TransactionInfo;
47  import org.codehaus.activemq.message.XATransactionInfo;
48  import org.codehaus.activemq.message.util.BoundedPacketQueue;
49  import org.codehaus.activemq.message.util.SpooledBoundedPacketQueue;
50  import org.codehaus.activemq.transport.TransportChannel;
51  import org.codehaus.activemq.util.IdGenerator;
52  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
53  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
54  import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
55  
56  /***
57   * A Broker client side proxy representing a JMS Connnection
58   * 
59   * @version $Revision: 1.25 $
60   */
61  public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener {
62      private static final Log log = LogFactory.getLog(BrokerClientImpl.class);
63      private BrokerConnector brokerConnector;
64      private TransportChannel channel;
65      private ConnectionInfo connectionInfo;
66      private IdGenerator packetIdGenerator;
67      private SynchronizedBoolean closed;
68      private Set activeConsumers;
69      private CopyOnWriteArrayList consumers;
70      private CopyOnWriteArrayList producers;
71      private CopyOnWriteArrayList transactions;
72      private CopyOnWriteArrayList xatransactions;
73      private CopyOnWriteArrayList sessions;
74      private boolean started;
75      private boolean brokerConnection;
76      private boolean clusteredConnection;
77      private String remoteBrokerName;
78      private int capacity = 100;
79      private SpooledBoundedPacketQueue spoolQueue;
80      private boolean cleanedUp;
81  
82      /***
83       * Default Constructor of BrokerClientImpl
84       */
85      public BrokerClientImpl() {
86          this.packetIdGenerator = new IdGenerator();
87          this.closed = new SynchronizedBoolean(false);
88          this.activeConsumers = new HashSet();
89          this.consumers = new CopyOnWriteArrayList();
90          this.producers = new CopyOnWriteArrayList();
91          this.transactions = new CopyOnWriteArrayList();
92          this.xatransactions = new CopyOnWriteArrayList();
93          this.sessions = new CopyOnWriteArrayList();
94      }
95  
96      /***
97       * Initialize the BrokerClient
98       * 
99       * @param brokerConnector
100      * @param channel
101      */
102     public void initialize(BrokerConnector brokerConnector, TransportChannel channel) {
103         this.brokerConnector = brokerConnector;
104         this.channel = channel;
105         this.channel.setPacketListener(this);
106         this.channel.setExceptionListener(this);
107         log.trace("brokerConnectorConnector client initialized");
108     }
109     
110     /***
111      * @return the BrokerConnector this client is associated with
112      */
113     public BrokerConnector getBrokerConnector(){
114         return this.brokerConnector;
115     }
116 
117     /***
118      * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
119      */
120     public void onException(JMSException jmsEx) {
121         log.warn(this + " caught exception ", jmsEx);
122         close();
123     }
124 
125     /***
126      * @return pretty print for this brokerConnector-client
127      */
128     public String toString() {
129         String str = "brokerConnector-client:("+hashCode()+") ";
130         str += connectionInfo == null ? "" : connectionInfo.getClientId();
131         str += ": " + channel;
132         return str;
133     }
134 
135     /***
136      * Dispatch an ActiveMQMessage to the end client
137      * 
138      * @param message
139      */
140     public void dispatch(ActiveMQMessage message) {
141         if (isSlowConsumer()) {
142             if (spoolQueue == null) {
143                 log.warn("Connection: " + connectionInfo.getClientId() + " is a slow consumer");
144                 String spoolName = brokerConnector.getBrokerInfo().getBrokerName() + "_" + connectionInfo.getClientId();
145                 try {
146                     spoolQueue = new SpooledBoundedPacketQueue(brokerConnector.getBrokerContainer().getBroker()
147                             .getTempDir(), spoolName);
148                     final BoundedPacketQueue bpq = spoolQueue;
149                     ThreadedExecutor exec = new ThreadedExecutor();
150                     exec.execute(new Runnable() {
151                         public void run() {
152                             while (!closed.get()) {
153                                 try {
154                                     Packet packet = bpq.dequeue();
155                                 }
156                                 catch (InterruptedException e) {
157                                     log.warn("async dispatch got an interupt", e);
158                                 }
159                                 catch (JMSException e) {
160                                     log.error("async dispatch got an problem", e);
161                                 }
162                             }
163                         }
164                     });
165                 }
166                 catch (IOException e) {
167                     log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
168                     close();
169                 }
170                 catch (InterruptedException e) {
171                     log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
172                     close();
173                 }
174             }
175             if (spoolQueue != null) {
176                 try {
177                     spoolQueue.enqueue(message);
178                 }
179                 catch (JMSException e) {
180                     log.error(
181                             "Could not enqueue message " + message + " to SpooledBoundedQueue for this slow consumer",
182                             e);
183                     close();
184                 }
185             }
186         }
187         else {
188             send(message);
189         }
190     }
191 
192     /***
193      * @return true if the peer for this Client is itself another Broker
194      */
195     public boolean isBrokerConnection() {
196         return brokerConnection;
197     }
198     
199     /***
200      * @return true id this client is part of a cluster 
201      */
202     public boolean isClusteredConnection(){
203         return clusteredConnection;
204     }
205 
206 
207     /***
208      * Get the Capacity for in-progress messages at the peer (probably a JMSConnection) Legimate values between 0-100. 0
209      * capacity representing that the peer cannot process any more messages at the current time
210      * 
211      * @return
212      */
213     public int getCapacity() {
214         return capacity;
215     }
216 
217     public String getClientID() {
218         if (connectionInfo != null) {
219             return connectionInfo.getClientId();
220         }
221         return null;
222     }
223 
224     public TransportChannel getChannel() {
225         return channel;
226     }
227 
228     /***
229      * Get an indication if the peer should be considered as a slow consumer
230      * 
231      * @return true id the peer should be considered as a slow consumer
232      */
233     public boolean isSlowConsumer() {
234         return capacity <= 20; //don't want to fill the peer completely - as this may effect it's processing!
235     }
236 
237     /***
238      * Consume a Packet from the underlying TransportChannel for processing
239      * 
240      * @param packet
241      */
242     public void consume(Packet packet) {
243         if (!closed.get() && packet != null) {
244             Throwable requestEx = null;
245             boolean failed = false;
246             String brokerName = brokerConnector.getBrokerInfo().getBrokerName();
247             String clusterName = brokerConnector.getBrokerInfo().getClusterName();
248             try {
249                 if (brokerConnection) {
250                     packet.addBrokerVisited(remoteBrokerName); //got from the remote broker
251                     packet.addBrokerVisited(brokerName);
252                 }
253                
254                 if (packet.isJMSMessage()) {
255                     ActiveMQMessage message = (ActiveMQMessage) packet;
256                     // lets mark all messages from where they came from so that NoLocal can filter out loops
257                     // e.g. when receiving messages over multicast, we don't want to publish them back out again
258                     if (connectionInfo != null) {
259                         message.setProducerID(connectionInfo.getClientId());
260                     }
261                     else {
262                         log.warn("No connection info available! Maybe the client forgot to start() the Connection?");
263                     }
264                     if (!brokerConnection){
265                         message.setEntryBrokerName(brokerName);
266                         message.setEntryClusterName(clusterName);
267                     }
268                     consumeActiveMQMessage(message);
269                 }
270                 else {
271                     switch (packet.getPacketType()) {
272                         case Packet.ACTIVEMQ_MSG_ACK : {
273                             MessageAck ack = (MessageAck) packet;
274                             consumeMessageAck(ack);
275                             break;
276                         }
277                         case Packet.XA_TRANSACTION_INFO : {
278                             XATransactionInfo info = (XATransactionInfo) packet;
279                             consumeXATransactionInfo(info);
280                             break;
281                         }
282                         case Packet.TRANSACTION_INFO : {
283                             TransactionInfo info = (TransactionInfo) packet;
284                             consumeTransactionInfo(info);
285                             break;
286                         }
287                         case Packet.CONSUMER_INFO : {
288                             ConsumerInfo info = (ConsumerInfo) packet;
289                             consumeConsumerInfo(info);
290                             break;
291                         }
292                         case Packet.PRODUCER_INFO : {
293                             ProducerInfo info = (ProducerInfo) packet;
294                             consumeProducerInfo(info);
295                             break;
296                         }
297                         case Packet.SESSION_INFO : {
298                             SessionInfo info = (SessionInfo) packet;
299                             consumeSessionInfo(info);
300                             break;
301                         }
302                         case Packet.ACTIVEMQ_CONNECTION_INFO : {
303                             ConnectionInfo info = (ConnectionInfo) packet;
304                             consumeConnectionInfo(info);
305                             break;
306                         }
307                         case Packet.DURABLE_UNSUBSCRIBE : {
308                             DurableUnsubscribe ds = (DurableUnsubscribe) packet;
309                             brokerConnector.durableUnsubscribe(this, ds);
310                             break;
311                         }
312                         case Packet.CAPACITY_INFO : {
313                             CapacityInfo info = (CapacityInfo) packet;
314                             consumeCapacityInfo(info);
315                             break;
316                         }
317                         case Packet.CAPACITY_INFO_REQUEST : {
318                             updateCapacityInfo(packet.getId());
319                             break;
320                         }
321                         case Packet.ACTIVEMQ_BROKER_INFO : {
322                             consumeBrokerInfo((BrokerInfo) packet);
323                             break;
324                         }
325                         default : {
326                             log.warn("Unknown Packet received: " + packet);
327                             break;
328                         }
329                     }
330                 }
331             }
332             catch (Throwable e) {
333                 requestEx = e;
334                 log.warn("caught exception consuming packet: " + packet, e);
335                 failed = true;
336             }
337             sendReceipt(packet, requestEx, failed);
338         }
339     }
340 
341     /***
342      * Register/deregister MessageConsumer with the Broker
343      * 
344      * @param info
345      * @throws JMSException
346      */
347     public void consumeConsumerInfo(ConsumerInfo info) throws JMSException {
348         if (info.isStarted()) {
349             consumers.add(info);
350             if ((connectionInfo != null && connectionInfo.isStarted())) {
351                 if (this.activeConsumers.add(info)) {
352                     this.brokerConnector.registerMessageConsumer(this, info);
353                 }
354             }
355         }
356         else {
357             consumers.remove(info);
358             if (activeConsumers.remove(info)) {
359                 this.brokerConnector.deregisterMessageConsumer(this, info);
360             }
361         }
362     }
363 
364     /***
365      * Update the peer Connection about the Broker's capacity for messages
366      * 
367      * @param capacity
368      */
369     public void updateBrokerCapacity(int capacity) {
370         CapacityInfo info = new CapacityInfo();
371         info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
372         info.setCapacity(capacity);
373         info.setFlowControlTimeout(getFlowControlTimeout(capacity));
374         send(info);
375     }
376 
377     /***
378      * register with the Broker
379      * 
380      * @param info
381      * @throws JMSException
382      */
383     public void consumeConnectionInfo(ConnectionInfo info) throws JMSException {
384         this.connectionInfo = info;
385         if (info.isClosed()) {
386             cleanUp();
387             try {
388                 sendReceipt(info);
389                 info.setReceiptRequired(false);
390                 try {
391                     Thread.sleep(500);
392                 }
393                 catch (Throwable e) {
394                 }
395             }
396             finally {
397                 close();
398             }
399         }
400         else {
401             if (!started && info.isStarted()) {
402                 started = true;
403                 log.debug(this + " has started running client version " + info.getClientVersion() + " , wire format = " + info.getWireFormatVersion());
404                
405                 this.brokerConnector.registerClient(this, info);
406                 //go through consumers and producers - setting their clientId (which might not have been set)
407                 for (Iterator i = consumers.iterator();i.hasNext();) {
408                     ConsumerInfo ci = (ConsumerInfo) i.next();
409                     ci.setClientId(info.getClientId());
410                 }
411                 for (Iterator i = producers.iterator();i.hasNext();) {
412                     ProducerInfo pi = (ProducerInfo) i.next();
413                     pi.setClientId(info.getClientId());
414                 }
415             }
416             if (info.isStarted()) {
417                 //register any consumers
418                 for (Iterator i = consumers.iterator();i.hasNext();) {
419                     ConsumerInfo ci = (ConsumerInfo) i.next();
420                     if (activeConsumers.add(ci)) {
421                         this.brokerConnector.registerMessageConsumer(this, ci);
422                     }
423                 }
424             }
425             else {
426                 log.debug(this + " has stopped");
427                 //de-register any consumers
428                 for (Iterator i = consumers.iterator();i.hasNext();) {
429                     ConsumerInfo ci = (ConsumerInfo) i.next();
430                     if (activeConsumers.remove(ci)) {
431                         this.brokerConnector.deregisterMessageConsumer(this, ci);
432                     }
433                 }
434             }
435         }
436     }
437 
438     /***
439      * start consuming messages
440      * 
441      * @throws JMSException
442      */
443     public void start() throws JMSException {
444         channel.start();
445     }
446 
447     /***
448      * stop consuming messages
449      * 
450      * @throws JMSException
451      */
452     public void stop() throws JMSException {
453         log.trace("Stopping channel: " + channel);
454         channel.stop();
455     }
456 
457     public synchronized void cleanUp() {
458         // we could be called here from 2 different code paths
459         // based on if we get a transport failure or we do a clean shutdown
460         // so lets only run this stuff once
461         if (!cleanedUp) {
462             cleanedUp = true;
463             try {
464                 try {
465                     for (Iterator i = consumers.iterator();i.hasNext();) {
466                         ConsumerInfo info = (ConsumerInfo) i.next();
467                         info.setStarted(false);
468                         this.brokerConnector.deregisterMessageConsumer(this, info);
469                     }
470                     for (Iterator i = producers.iterator();i.hasNext();) {
471                         ProducerInfo info = (ProducerInfo) i.next();
472                         info.setStarted(false);
473                         this.brokerConnector.deregisterMessageProducer(this, info);
474                     }
475                     for (Iterator i = sessions.iterator();i.hasNext();) {
476                         SessionInfo info = (SessionInfo) i.next();
477                         info.setStarted(false);
478                         this.brokerConnector.deregisterSession(this, info);
479                     }
480                     for (Iterator i = transactions.iterator();i.hasNext();) {
481                         this.brokerConnector.rollbackTransaction(this, i.next().toString());
482                     }
483                     for (Iterator i = xatransactions.iterator();i.hasNext();) {
484                         try {
485                             this.brokerConnector.rollbackTransaction(this, (ActiveMQXid) i.next());
486                         }
487                         catch (XAException e) {
488                             log.warn("Transaction rollback failed:", e);
489                         }
490                     }
491                 }
492                 finally {
493                     // whatever happens, lets make sure we unregister & clean things down
494                     if (log.isDebugEnabled()) {
495                         log.debug(this + " has stopped");
496                     }
497                     this.consumers.clear();
498                     this.producers.clear();
499                     this.transactions.clear();
500                     this.xatransactions.clear();
501                     this.sessions.clear();
502                     this.brokerConnector.deregisterClient(this, connectionInfo);
503                 }
504             }
505             catch (JMSException e) {
506                 log.warn("failed to de-register Broker client: " + e, e);
507             }
508         }
509         else {
510             log.debug("We are ignoring a duplicate cleanup() method called for: " + this);
511         }
512     }
513 
514     // Implementation methods
515     //-------------------------------------------------------------------------
516     protected void send(Packet packet) {
517         if (!closed.get()) {
518             try {
519                 if (brokerConnection) {
520                     String brokerName = brokerConnector.getBrokerContainer().getBroker().getBrokerName();
521                     packet.addBrokerVisited(brokerName);
522                     if (packet.hasVisited(remoteBrokerName)) {
523                         if (log.isDebugEnabled()) {
524                             log.debug("Not Forwarding: " + remoteBrokerName + " has already been visited by packet: "
525                                     + packet);
526                         }
527                         return;
528                     }
529                 }
530                 this.channel.asyncSend(packet);
531             }
532             catch (JMSException e) {
533                 log.warn(this + " caught exception ", e);
534                 close();
535             }
536         }
537     }
538 
539     protected void close() {
540         if (closed.commit(false, true)) {
541             this.channel.stop();
542             log.debug(this + " has closed");
543         }
544     }
545 
546     /***
547      * Send message to Broker
548      * 
549      * @param message
550      * @throws JMSException
551      */
552     private void consumeActiveMQMessage(ActiveMQMessage message) throws JMSException {
553         message = message.shallowCopy();
554         if (message.isPartOfTransaction()) {
555             this.brokerConnector.sendTransactedMessage(this, message.getTransactionId(), message);
556         }
557         else {
558             this.brokerConnector.sendMessage(this, message);
559         }
560     }
561 
562     /***
563      * Send Message acknowledge to the Broker
564      * 
565      * @param ack
566      * @throws JMSException
567      */
568     private void consumeMessageAck(MessageAck ack) throws JMSException {
569         if (ack.isPartOfTransaction()) {
570             this.brokerConnector.acknowledgeTransactedMessage(this, ack.getTransactionId(), ack);
571         }
572         else {
573             this.brokerConnector.acknowledgeMessage(this, ack);
574         }
575     }
576 
577     /***
578      * Handle transaction start/commit/rollback
579      * 
580      * @param info
581      * @throws JMSException
582      */
583     private void consumeTransactionInfo(TransactionInfo info) throws JMSException {
584         if (info.getType() == TransactionInfo.START) {
585             transactions.add(info.getTransactionId());
586             this.brokerConnector.startTransaction(this, info.getTransactionId());
587         }
588         else {
589             if (info.getType() == TransactionInfo.ROLLBACK) {
590                 this.brokerConnector.rollbackTransaction(this, info.getTransactionId());
591             }
592             else if (info.getType() == TransactionInfo.COMMIT) {
593                 this.brokerConnector.commitTransaction(this, info.getTransactionId());
594             }
595             transactions.remove(info.getTransactionId());
596         }
597     }
598 
599     /***
600      * Handle XA transaction start/prepare/commit/rollback
601      * 
602      * @param info
603      * @throws JMSException
604      * @throws XAException
605      */
606     private void consumeXATransactionInfo(XATransactionInfo info) throws JMSException, XAException {
607         if (info.getType() == XATransactionInfo.START) {
608             transactions.add(info.getXid());
609             this.brokerConnector.startTransaction(this, info.getXid());
610         }
611         else if (info.getType() == XATransactionInfo.XA_RECOVER) {
612             ActiveMQXid rc[] = this.brokerConnector.getPreparedTransactions(this);
613             // We will be sending our own receipt..
614             info.setReceiptRequired(false);
615             // Send the receipt..
616             ResponseReceipt receipt = new ResponseReceipt();
617             receipt.setId(this.packetIdGenerator.generateId());
618             receipt.setCorrelationId(info.getId());
619             receipt.setResult(rc);
620             send(receipt);
621         }
622         else if (info.getType() == XATransactionInfo.GET_RM_ID) {
623             String rc = this.brokerConnector.getResourceManagerId(this);
624             // We will be sending our own receipt..
625             info.setReceiptRequired(false);
626             // Send the receipt..
627             ResponseReceipt receipt = new ResponseReceipt();
628             receipt.setId(this.packetIdGenerator.generateId());
629             receipt.setCorrelationId(info.getId());
630             receipt.setResult(rc);
631             send(receipt);
632         }
633         else if (info.getType() == XATransactionInfo.END) {
634             // we don't do anything..
635         }
636         else {
637             if (info.getType() == XATransactionInfo.PRE_COMMIT) {
638                 int rc = this.brokerConnector.prepareTransaction(this, info.getXid());
639                 // We will be sending our own receipt..
640                 info.setReceiptRequired(false);
641                 // Send the receipt..
642                 IntResponseReceipt receipt = new IntResponseReceipt();
643                 receipt.setId(this.packetIdGenerator.generateId());
644                 receipt.setCorrelationId(info.getId());
645                 receipt.setResult(rc);
646                 send(receipt);
647             }
648             else if (info.getType() == XATransactionInfo.ROLLBACK) {
649                 this.brokerConnector.rollbackTransaction(this, info.getXid());
650             }
651             else if (info.getType() == XATransactionInfo.COMMIT_ONE_PHASE) {
652                 this.brokerConnector.commitTransaction(this, info.getXid(), true);
653             }
654             else if (info.getType() == XATransactionInfo.COMMIT) {
655                 this.brokerConnector.commitTransaction(this, info.getXid(), false);
656             }
657             else {
658                 throw new JMSException("Packet type: " + info.getType() + " not recognized.");
659             }
660             transactions.remove(info.getXid());
661         }
662     }
663 
664     /***
665      * register/deregister MessageProducer in the Broker
666      * 
667      * @param info
668      * @throws JMSException
669      */
670     private void consumeProducerInfo(ProducerInfo info) throws JMSException {
671         if (info.isStarted()) {
672             producers.add(info);
673             this.brokerConnector.registerMessageProducer(this, info);
674         }
675         else {
676             producers.remove(info);
677             this.brokerConnector.deregisterMessageProducer(this, info);
678         }
679     }
680 
681     /***
682      * register/deregister Session in a Broker
683      * 
684      * @param info
685      * @throws JMSException
686      */
687     private void consumeSessionInfo(SessionInfo info) throws JMSException {
688         if (info.isStarted()) {
689             sessions.add(info);
690             this.brokerConnector.registerSession(this, info);
691         }
692         else {
693             sessions.remove(info);
694             this.brokerConnector.deregisterSession(this, info);
695         }
696     }
697 
698     /***
699      * Update capacity for the peer
700      * 
701      * @param info
702      */
703     private void consumeCapacityInfo(CapacityInfo info) {
704         this.capacity = info.getCapacity();
705     }
706 
707     private void updateCapacityInfo(String correlationId) {
708         CapacityInfo info = new CapacityInfo();
709         info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
710         info.setCorrelationId(correlationId);
711         info.setCapacity(this.brokerConnector.getBrokerCapacity());
712         info.setFlowControlTimeout(getFlowControlTimeout(info.getCapacity()));
713         send(info);
714     }
715 
716     private long getFlowControlTimeout(int capacity) {
717         long result = -1;
718         if (capacity <= 0) {
719             result = 10000;
720         }
721         else if (capacity <= 10) {
722             result = 1000;
723         }
724         else if (capacity <= 20) {
725             result = 10;
726         }
727         return result;
728     }
729 
730     private void consumeBrokerInfo(BrokerInfo info) {
731         brokerConnection = true;
732         remoteBrokerName = info.getBrokerName();
733         String clusterName = getBrokerConnector().getBrokerContainer().getBroker().getBrokerClusterName();
734         if (clusterName.equals(info.getClusterName())){
735             clusteredConnection = true;
736         }
737     }
738 
739     private void sendReceipt(Packet packet) {
740         sendReceipt(packet, null, false);
741     }
742 
743     private void sendReceipt(Packet packet, Throwable requestEx, boolean failed) {
744         if (packet != null && packet.isReceiptRequired()) {
745             Receipt receipt = new Receipt();
746             receipt.setId(this.packetIdGenerator.generateId());
747             receipt.setCorrelationId(packet.getId());
748             receipt.setBrokerName(brokerConnector.getBrokerInfo().getBrokerName());
749             receipt.setClusterName(brokerConnector.getBrokerInfo().getClusterName());
750             receipt.setException(requestEx);
751             receipt.setFailed(failed);
752             send(receipt);
753         }
754     }
755 }