1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activecluster.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activecluster.Node;
23
24 import javax.jms.Message;
25 import javax.jms.MessageListener;
26 import javax.jms.ObjectMessage;
27
28
29 /***
30 * A JMS MessageListener which processes inbound messages and
31 * applies them to a StateService
32 *
33 * @version $Revision: 1.3 $
34 */
35 public class StateConsumer implements MessageListener {
36
37 private final static Log log = LogFactory.getLog(StateConsumer.class);
38
39 private StateService stateService;
40
41 public StateConsumer(StateService stateService) {
42 if (stateService == null) {
43 throw new IllegalArgumentException("Must specify a valid StateService implementation");
44 }
45 this.stateService = stateService;
46 }
47
48 public void onMessage(Message message) {
49 if (log.isDebugEnabled()) {
50 log.debug("Received cluster data message!: " + message);
51 }
52
53 if (message instanceof ObjectMessage) {
54 ObjectMessage objectMessage = (ObjectMessage) message;
55 try {
56 Node node = (Node) objectMessage.getObject();
57 String type = objectMessage.getJMSType();
58 if (type != null && type.equals("shutdown")) {
59 stateService.shutdown(node);
60 }
61 else {
62 stateService.keepAlive(node);
63 }
64 }
65 catch (Exception e) {
66 log.error("Could not extract node from message: " + e + ". Message: " + message, e);
67 }
68 }
69 else {
70 log.warn("Ignoring message: " + message);
71 }
72 }
73 }