001/** 002 * GRANITE DATA SERVICES 003 * Copyright (C) 2006-2013 GRANITE DATA SERVICES S.A.S. 004 * 005 * This file is part of the Granite Data Services Platform. 006 * 007 * *** 008 * 009 * Community License: GPL 3.0 010 * 011 * This file is free software: you can redistribute it and/or modify 012 * it under the terms of the GNU General Public License as published 013 * by the Free Software Foundation, either version 3 of the License, 014 * or (at your option) any later version. 015 * 016 * This file is distributed in the hope that it will be useful, but 017 * WITHOUT ANY WARRANTY; without even the implied warranty of 018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 019 * GNU General Public License for more details. 020 * 021 * You should have received a copy of the GNU General Public License 022 * along with this program. If not, see <http://www.gnu.org/licenses/>. 023 * 024 * *** 025 * 026 * Available Commercial License: GraniteDS SLA 1.0 027 * 028 * This is the appropriate option if you are creating proprietary 029 * applications and you are not prepared to distribute and share the 030 * source code of your application under the GPL v3 license. 031 * 032 * Please visit http://www.granitedataservices.com/license for more 033 * details. 034 */ 035package org.granite.client.tide.data; 036 037import java.util.ArrayList; 038import java.util.List; 039 040import javax.annotation.PostConstruct; 041import javax.annotation.PreDestroy; 042 043import org.granite.client.messaging.Consumer; 044import org.granite.client.messaging.ResponseListener; 045import org.granite.client.messaging.ResultFaultIssuesResponseListener; 046import org.granite.client.messaging.TopicMessageListener; 047import org.granite.client.messaging.events.FaultEvent; 048import org.granite.client.messaging.events.IssueEvent; 049import org.granite.client.messaging.events.ResultEvent; 050import org.granite.client.messaging.events.TopicMessageEvent; 051import org.granite.client.messaging.messages.push.TopicMessage; 052import org.granite.client.tide.Context; 053import org.granite.client.tide.ContextAware; 054import org.granite.client.tide.NameAware; 055import org.granite.client.tide.data.EntityManager.UpdateKind; 056import org.granite.client.tide.data.spi.MergeContext; 057import org.granite.client.tide.server.ServerSession; 058import org.granite.logging.Logger; 059 060/** 061 * @author William DRAI 062 */ 063public class DataObserver implements ContextAware, NameAware { 064 065 private static Logger log = Logger.getLogger(DataObserver.class); 066 067 public static final String DATA_OBSERVER_TOPIC_NAME = "tideDataTopic"; 068 069 private Context context; 070 private ServerSession serverSession = null; 071 private EntityManager entityManager = null; 072 private String channelType = null; 073 private String destination = null; 074 075 private Consumer consumer = null; 076 077 078 protected DataObserver() { 079 // CDI proxying... 080 } 081 082 public DataObserver(ServerSession serverSession) { 083 this.serverSession = serverSession; 084 if (serverSession.getContext() != null) 085 this.entityManager = serverSession.getContext().getEntityManager(); 086 } 087 088 public DataObserver(String channelType, ServerSession serverSession) { 089 this.channelType = channelType; 090 this.serverSession = serverSession; 091 if (serverSession.getContext() != null) 092 this.entityManager = serverSession.getContext().getEntityManager(); 093 } 094 095 public DataObserver(ServerSession serverSession, EntityManager entityManager) { 096 this.serverSession = serverSession; 097 this.entityManager = entityManager; 098 } 099 100 public DataObserver(String destination, ServerSession serverSession, EntityManager entityManager) { 101 this.destination = destination; 102 this.serverSession = serverSession; 103 this.entityManager = entityManager; 104 } 105 106 public DataObserver(String destination, String channelType, ServerSession serverSession, EntityManager entityManager) { 107 this.destination = destination; 108 this.channelType = channelType; 109 this.serverSession = serverSession; 110 this.entityManager = entityManager; 111 } 112 113 public void setContext(Context context) { 114 this.context = context; 115 if (this.entityManager == null) 116 this.entityManager = context.getEntityManager(); 117 } 118 119 public void setName(String name) { 120 if (this.destination == null) 121 this.destination = name; 122 } 123 124 public void start() { 125 consumer = serverSession.getConsumer(destination, DATA_OBSERVER_TOPIC_NAME, channelType); 126 } 127 128 public void stop() { 129 if (consumer.isSubscribed()) 130 unsubscribe(); 131 } 132 133 134 /** 135 * Subscribe the data topic 136 */ 137 public void subscribe() { 138 consumer.addMessageListener(messageListener); 139 consumer.subscribe(subscriptionListener); 140 serverSession.checkWaitForLogout(); 141 } 142 143 public void unsubscribe() { 144 if (consumer.isSubscribed()) { 145 consumer.removeMessageListener(messageListener); 146 consumer.unsubscribe(unsubscriptionListener); 147 serverSession.checkWaitForLogout(); 148 } 149 } 150 151 private ResponseListener subscriptionListener = new SubscriptionListenerImpl(); 152 private ResponseListener unsubscriptionListener = new UnsubscriptionListenerImpl(); 153 154 private class SubscriptionListenerImpl extends ResultFaultIssuesResponseListener { 155 @Override 156 public void onResult(ResultEvent event) { 157 log.info("Destination %s subscribed sid: %s", destination, consumer.getSubscriptionId()); 158 159 serverSession.tryLogout(); 160 } 161 162 @Override 163 public void onFault(FaultEvent event) { 164 log.error("Destination %s could not be subscribed: %s", destination, event.getCode()); 165 166 serverSession.tryLogout(); 167 } 168 169 @Override 170 public void onIssue(IssueEvent event) { 171 log.error("Destination %s could not be subscribed: %s", destination, event.getType()); 172 173 serverSession.tryLogout(); 174 } 175 } 176 177 private class UnsubscriptionListenerImpl extends ResultFaultIssuesResponseListener { 178 @Override 179 public void onResult(ResultEvent event) { 180 log.info("Destination %s unsubscribed", destination); 181 182 serverSession.tryLogout(); 183 } 184 185 @Override 186 public void onFault(FaultEvent event) { 187 log.error("Destination %s could not be unsubscribed: %s", destination, event.getCode()); 188 189 serverSession.tryLogout(); 190 } 191 192 @Override 193 public void onIssue(IssueEvent event) { 194 log.error("Destination %s could not be unsubscribed: %s", destination, event.getType()); 195 196 serverSession.tryLogout(); 197 } 198 } 199 200 201 private TopicMessageListener messageListener = new TopicMessageListenerImpl(); 202 203 public class TopicMessageListenerImpl implements TopicMessageListener { 204 /** 205 * Message handler that merges data from the JMS topic in the current context.<br/> 206 * Could be overriden to provide custom behaviour. 207 * 208 * @param event message event from the Consumer 209 */ 210 @Override 211 public void onMessage(TopicMessageEvent event) { 212 log.debug("Destination %s message event received %s", destination, event.toString()); 213 214 final TopicMessage message = event.getMessage(); 215 216 context.callLater(new Runnable() { 217 @Override 218 public void run() { 219 try { 220 String receivedSessionId = (String)message.getHeader("GDSSessionID"); 221 if (receivedSessionId != null && receivedSessionId.equals(serverSession.getSessionId())) 222 receivedSessionId = null; 223 224 MergeContext mergeContext = entityManager.initMerge(); 225 226 Object[] updates = (Object[])message.getData(); 227 List<EntityManager.Update> upds = new ArrayList<EntityManager.Update>(); 228 for (Object update : updates) 229 upds.add(new EntityManager.Update(UpdateKind.forName(((Object[])update)[0].toString().toUpperCase()), ((Object[])update)[1])); 230 231 entityManager.handleUpdates(mergeContext, receivedSessionId, upds); 232 entityManager.raiseUpdateEvents(context, upds); 233 } 234 catch (Exception e) { 235 log.error(e, "Error during received message processing"); 236 } 237 finally { 238 MergeContext.destroy(entityManager); 239 } 240 } 241 }); 242 } 243 } 244}