001/**
002 *   GRANITE DATA SERVICES
003 *   Copyright (C) 2006-2013 GRANITE DATA SERVICES S.A.S.
004 *
005 *   This file is part of the Granite Data Services Platform.
006 *
007 *                               ***
008 *
009 *   Community License: GPL 3.0
010 *
011 *   This file is free software: you can redistribute it and/or modify
012 *   it under the terms of the GNU General Public License as published
013 *   by the Free Software Foundation, either version 3 of the License,
014 *   or (at your option) any later version.
015 *
016 *   This file is distributed in the hope that it will be useful, but
017 *   WITHOUT ANY WARRANTY; without even the implied warranty of
018 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
019 *   GNU General Public License for more details.
020 *
021 *   You should have received a copy of the GNU General Public License
022 *   along with this program. If not, see <http://www.gnu.org/licenses/>.
023 *
024 *                               ***
025 *
026 *   Available Commercial License: GraniteDS SLA 1.0
027 *
028 *   This is the appropriate option if you are creating proprietary
029 *   applications and you are not prepared to distribute and share the
030 *   source code of your application under the GPL v3 license.
031 *
032 *   Please visit http://www.granitedataservices.com/license for more
033 *   details.
034 */
035package org.granite.client.tide.data;
036
037import java.util.ArrayList;
038import java.util.List;
039
040import javax.annotation.PostConstruct;
041import javax.annotation.PreDestroy;
042
043import org.granite.client.messaging.Consumer;
044import org.granite.client.messaging.ResponseListener;
045import org.granite.client.messaging.ResultFaultIssuesResponseListener;
046import org.granite.client.messaging.TopicMessageListener;
047import org.granite.client.messaging.events.FaultEvent;
048import org.granite.client.messaging.events.IssueEvent;
049import org.granite.client.messaging.events.ResultEvent;
050import org.granite.client.messaging.events.TopicMessageEvent;
051import org.granite.client.messaging.messages.push.TopicMessage;
052import org.granite.client.tide.Context;
053import org.granite.client.tide.ContextAware;
054import org.granite.client.tide.NameAware;
055import org.granite.client.tide.data.EntityManager.UpdateKind;
056import org.granite.client.tide.data.spi.MergeContext;
057import org.granite.client.tide.server.ServerSession;
058import org.granite.logging.Logger;
059
060/**
061 * @author William DRAI
062 */
063public class DataObserver implements ContextAware, NameAware {
064    
065    private static Logger log = Logger.getLogger(DataObserver.class);
066    
067    public static final String DATA_OBSERVER_TOPIC_NAME = "tideDataTopic";
068
069    private Context context;
070    private ServerSession serverSession = null;
071    private EntityManager entityManager = null;
072    private String channelType = null;
073    private String destination = null;
074    
075        private Consumer consumer = null;
076
077        
078    protected DataObserver() {
079        // CDI proxying...
080    }
081    
082        public DataObserver(ServerSession serverSession) {
083                this.serverSession = serverSession;
084        if (serverSession.getContext() != null)
085                    this.entityManager = serverSession.getContext().getEntityManager();
086        }
087
088    public DataObserver(String channelType, ServerSession serverSession) {
089        this.channelType = channelType;
090        this.serverSession = serverSession;
091        if (serverSession.getContext() != null)
092            this.entityManager = serverSession.getContext().getEntityManager();
093    }
094
095        public DataObserver(ServerSession serverSession, EntityManager entityManager) {
096                this.serverSession = serverSession;
097                this.entityManager = entityManager;
098        }
099        
100        public DataObserver(String destination, ServerSession serverSession, EntityManager entityManager) {
101                this.destination = destination;
102                this.serverSession = serverSession;
103                this.entityManager = entityManager;
104        }
105
106    public DataObserver(String destination, String channelType, ServerSession serverSession, EntityManager entityManager) {
107        this.destination = destination;
108        this.channelType = channelType;
109        this.serverSession = serverSession;
110        this.entityManager = entityManager;
111    }
112
113        public void setContext(Context context) {
114        this.context = context;
115        if (this.entityManager == null)
116            this.entityManager = context.getEntityManager();
117        }
118        
119        public void setName(String name) {
120                if (this.destination == null)
121                        this.destination = name;
122        }
123        
124        public void start() {
125        consumer = serverSession.getConsumer(destination, DATA_OBSERVER_TOPIC_NAME, channelType);
126        }       
127        
128        public void stop() {
129                if (consumer.isSubscribed())
130                        unsubscribe();
131        }
132        
133        
134        /**
135         *      Subscribe the data topic
136         */
137        public void subscribe() {
138                consumer.addMessageListener(messageListener);
139            consumer.subscribe(subscriptionListener);
140            serverSession.checkWaitForLogout();
141        }
142        
143        public void unsubscribe() {
144                if (consumer.isSubscribed()) {
145                        consumer.removeMessageListener(messageListener);
146                        consumer.unsubscribe(unsubscriptionListener);
147                    serverSession.checkWaitForLogout();
148                }
149        }
150        
151        private ResponseListener subscriptionListener = new SubscriptionListenerImpl(); 
152        private ResponseListener unsubscriptionListener = new UnsubscriptionListenerImpl(); 
153        
154        private class SubscriptionListenerImpl extends ResultFaultIssuesResponseListener {
155                @Override
156                public void onResult(ResultEvent event) {
157                        log.info("Destination %s subscribed sid: %s", destination, consumer.getSubscriptionId());
158                        
159                        serverSession.tryLogout();
160                }
161
162                @Override
163                public void onFault(FaultEvent event) {
164                        log.error("Destination %s could not be subscribed: %s", destination, event.getCode());
165                        
166                        serverSession.tryLogout();
167                }
168
169                @Override
170                public void onIssue(IssueEvent event) {
171                        log.error("Destination %s could not be subscribed: %s", destination, event.getType());
172                        
173                        serverSession.tryLogout();
174                }
175        }
176        
177        private class UnsubscriptionListenerImpl extends ResultFaultIssuesResponseListener {
178                @Override
179                public void onResult(ResultEvent event) {
180                        log.info("Destination %s unsubscribed", destination);
181                        
182                        serverSession.tryLogout();
183                }
184
185                @Override
186                public void onFault(FaultEvent event) {
187                        log.error("Destination %s could not be unsubscribed: %s", destination, event.getCode());
188                        
189                        serverSession.tryLogout();
190                }
191
192                @Override
193                public void onIssue(IssueEvent event) {
194                        log.error("Destination %s could not be unsubscribed: %s", destination, event.getType());
195                        
196                        serverSession.tryLogout();
197                }
198        }
199
200        
201        private TopicMessageListener messageListener = new TopicMessageListenerImpl();
202        
203    public class TopicMessageListenerImpl implements TopicMessageListener {
204        /**
205         *      Message handler that merges data from the JMS topic in the current context.<br/>
206         *  Could be overriden to provide custom behaviour.
207         *
208         *  @param event message event from the Consumer
209         */
210        @Override
211                public void onMessage(TopicMessageEvent event) {
212                log.debug("Destination %s message event received %s", destination, event.toString());
213                
214                final TopicMessage message = event.getMessage();
215                
216                context.callLater(new Runnable() {
217                                @Override
218                                public void run() {
219                                try {
220                                        String receivedSessionId = (String)message.getHeader("GDSSessionID");
221                                        if (receivedSessionId != null && receivedSessionId.equals(serverSession.getSessionId()))
222                                                receivedSessionId = null;
223                                        
224                                        MergeContext mergeContext = entityManager.initMerge();
225                                        
226                                        Object[] updates = (Object[])message.getData();
227                                        List<EntityManager.Update> upds = new ArrayList<EntityManager.Update>();
228                                        for (Object update : updates)
229                                                upds.add(new EntityManager.Update(UpdateKind.forName(((Object[])update)[0].toString().toUpperCase()), ((Object[])update)[1]));
230                                        
231                                        entityManager.handleUpdates(mergeContext, receivedSessionId, upds);
232                                        entityManager.raiseUpdateEvents(context, upds);
233                                }
234                                catch (Exception e) {
235                                        log.error(e, "Error during received message processing");
236                                }
237                                finally {
238                                        MergeContext.destroy(entityManager);
239                                }
240                                }
241                });
242                }
243    }
244}