/*
 * Decompiled with CFR 0.152.
 */
package org.granite.gravity;

import flex.messaging.messages.AcknowledgeMessage;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.CommandMessage;
import flex.messaging.messages.ErrorMessage;
import flex.messaging.messages.Message;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.ObjectName;
import org.granite.clustering.DistributedData;
import org.granite.config.GraniteConfig;
import org.granite.config.flex.Destination;
import org.granite.config.flex.ServicesConfig;
import org.granite.context.GraniteContext;
import org.granite.context.SimpleGraniteContext;
import org.granite.gravity.AbstractChannel;
import org.granite.gravity.AsyncChannelRunner;
import org.granite.gravity.AsyncHttpContext;
import org.granite.gravity.Channel;
import org.granite.gravity.ChannelFactory;
import org.granite.gravity.ChannelTimerTask;
import org.granite.gravity.DefaultGravityMBean;
import org.granite.gravity.Gravity;
import org.granite.gravity.GravityConfig;
import org.granite.gravity.GravityPool;
import org.granite.gravity.MessageReceivingException;
import org.granite.gravity.Subscription;
import org.granite.gravity.TimeChannel;
import org.granite.gravity.adapters.AdapterFactory;
import org.granite.gravity.adapters.ServiceAdapter;
import org.granite.gravity.security.GravityDestinationSecurizer;
import org.granite.gravity.security.GravityInvocationContext;
import org.granite.gravity.udp.UdpReceiverFactory;
import org.granite.jmx.MBeanServerLocator;
import org.granite.jmx.OpenMBean;
import org.granite.logging.Logger;
import org.granite.messaging.service.security.SecurityService;
import org.granite.messaging.service.security.SecurityServiceException;
import org.granite.messaging.webapp.ServletGraniteContext;
import org.granite.scan.ServiceLoader;
import org.granite.util.TypeUtil;
import org.granite.util.UUIDUtil;

public class DefaultGravity
implements Gravity,
DefaultGravityMBean {
    private static final Logger log = Logger.getLogger(Gravity.class);
    private final Map<String, Object> applicationMap = new HashMap<String, Object>();
    private final ConcurrentHashMap<String, TimeChannel<?>> channels = new ConcurrentHashMap();
    private GravityConfig gravityConfig = null;
    private ServicesConfig servicesConfig = null;
    private GraniteConfig graniteConfig = null;
    private Channel serverChannel = null;
    private AdapterFactory adapterFactory = null;
    private GravityPool gravityPool = null;
    private UdpReceiverFactory udpReceiverFactory = null;
    private Timer channelsTimer;
    private boolean started;

    public DefaultGravity(GravityConfig gravityConfig, ServicesConfig servicesConfig, GraniteConfig graniteConfig) {
        if (gravityConfig == null || servicesConfig == null || graniteConfig == null) {
            throw new NullPointerException("All arguments must be non null.");
        }
        this.gravityConfig = gravityConfig;
        this.servicesConfig = servicesConfig;
        this.graniteConfig = graniteConfig;
    }

    @Override
    public GravityConfig getGravityConfig() {
        return this.gravityConfig;
    }

    @Override
    public ServicesConfig getServicesConfig() {
        return this.servicesConfig;
    }

    @Override
    public GraniteConfig getGraniteConfig() {
        return this.graniteConfig;
    }

    @Override
    public boolean isStarted() {
        return this.started;
    }

    @Override
    public ServiceAdapter getServiceAdapter(String messageType, String destinationId) {
        return this.adapterFactory.getServiceAdapter(messageType, destinationId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start() throws Exception {
        log.info("Starting Gravity...", new Object[0]);
        DefaultGravity defaultGravity = this;
        synchronized (defaultGravity) {
            if (!this.started) {
                this.adapterFactory = new AdapterFactory(this);
                this.internalStart();
                this.serverChannel = new ServerChannel((Gravity)this, ServerChannel.class.getName(), null, null);
                if (this.gravityConfig.isUseUdp()) {
                    ServiceLoader<UdpReceiverFactory> loader = ServiceLoader.load(UdpReceiverFactory.class);
                    Iterator factories = loader.iterator();
                    if (factories.hasNext()) {
                        this.udpReceiverFactory = (UdpReceiverFactory)factories.next();
                        this.udpReceiverFactory.setPort(this.gravityConfig.getUdpPort());
                        this.udpReceiverFactory.setNio(this.gravityConfig.isUdpNio());
                        this.udpReceiverFactory.setConnected(this.gravityConfig.isUdpConnected());
                        this.udpReceiverFactory.setSendBufferSize(this.gravityConfig.getUdpSendBufferSize());
                        this.udpReceiverFactory.start();
                    } else {
                        log.warn("UDP receiver factory not found", new Object[0]);
                    }
                }
                this.started = true;
            }
        }
        log.info("Gravity successfully started.", new Object[0]);
    }

    protected void internalStart() {
        this.gravityPool = new GravityPool(this.gravityConfig);
        this.channelsTimer = new Timer();
        if (this.graniteConfig.isRegisterMBeans()) {
            try {
                ObjectName name = new ObjectName("org.graniteds:type=Gravity,context=" + this.graniteConfig.getMBeanContextName());
                log.info("Registering MBean: %s", name);
                OpenMBean mBean = OpenMBean.createMBean(this);
                MBeanServerLocator.getInstance().register(mBean, name, true);
            }
            catch (Exception e) {
                log.error(e, "Could not register Gravity MBean for context: %s", this.graniteConfig.getMBeanContextName());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void restart() throws Exception {
        DefaultGravity defaultGravity = this;
        synchronized (defaultGravity) {
            this.stop();
            this.start();
        }
    }

    @Override
    public void reconfigure(GravityConfig gravityConfig, GraniteConfig graniteConfig) {
        this.gravityConfig = gravityConfig;
        this.graniteConfig = graniteConfig;
        if (this.gravityPool != null) {
            this.gravityPool.reconfigure(gravityConfig);
        }
    }

    @Override
    public void stop() throws Exception {
        this.stop(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop(boolean now) throws Exception {
        log.info("Stopping Gravity (now=%s)...", now);
        DefaultGravity defaultGravity = this;
        synchronized (defaultGravity) {
            if (this.adapterFactory != null) {
                try {
                    this.adapterFactory.stopAll();
                }
                catch (Exception e) {
                    log.error(e, "Error while stopping adapter factory", new Object[0]);
                }
                this.adapterFactory = null;
            }
            if (this.serverChannel != null) {
                try {
                    this.removeChannel(this.serverChannel.getId(), false);
                }
                catch (Exception e) {
                    log.error(e, "Error while removing server channel: %s", this.serverChannel);
                }
                this.serverChannel = null;
            }
            if (this.channelsTimer != null) {
                try {
                    this.channelsTimer.cancel();
                }
                catch (Exception e) {
                    log.error(e, "Error while cancelling channels timer", new Object[0]);
                }
                this.channelsTimer = null;
            }
            if (this.gravityPool != null) {
                try {
                    if (now) {
                        this.gravityPool.shutdownNow();
                    } else {
                        this.gravityPool.shutdown();
                    }
                }
                catch (Exception e) {
                    log.error(e, "Error while stopping thread pool", new Object[0]);
                }
                this.gravityPool = null;
            }
            if (this.udpReceiverFactory != null) {
                try {
                    this.udpReceiverFactory.stop();
                }
                catch (Exception e) {
                    log.error(e, "Error while stopping udp receiver factory", new Object[0]);
                }
                this.udpReceiverFactory = null;
            }
            this.started = false;
        }
        log.info("Gravity sucessfully stopped.", new Object[0]);
    }

    @Override
    public String getGravityFactoryName() {
        return this.gravityConfig.getGravityFactory();
    }

    @Override
    public long getChannelIdleTimeoutMillis() {
        return this.gravityConfig.getChannelIdleTimeoutMillis();
    }

    @Override
    public void setChannelIdleTimeoutMillis(long channelIdleTimeoutMillis) {
        this.gravityConfig.setChannelIdleTimeoutMillis(channelIdleTimeoutMillis);
    }

    @Override
    public boolean isRetryOnError() {
        return this.gravityConfig.isRetryOnError();
    }

    @Override
    public void setRetryOnError(boolean retryOnError) {
        this.gravityConfig.setRetryOnError(retryOnError);
    }

    @Override
    public long getLongPollingTimeoutMillis() {
        return this.gravityConfig.getLongPollingTimeoutMillis();
    }

    @Override
    public void setLongPollingTimeoutMillis(long longPollingTimeoutMillis) {
        this.gravityConfig.setLongPollingTimeoutMillis(longPollingTimeoutMillis);
    }

    @Override
    public int getMaxMessagesQueuedPerChannel() {
        return this.gravityConfig.getMaxMessagesQueuedPerChannel();
    }

    @Override
    public void setMaxMessagesQueuedPerChannel(int maxMessagesQueuedPerChannel) {
        this.gravityConfig.setMaxMessagesQueuedPerChannel(maxMessagesQueuedPerChannel);
    }

    @Override
    public long getReconnectIntervalMillis() {
        return this.gravityConfig.getReconnectIntervalMillis();
    }

    @Override
    public int getReconnectMaxAttempts() {
        return this.gravityConfig.getReconnectMaxAttempts();
    }

    @Override
    public int getCorePoolSize() {
        if (this.gravityPool != null) {
            return this.gravityPool.getCorePoolSize();
        }
        return this.gravityConfig.getCorePoolSize();
    }

    @Override
    public void setCorePoolSize(int corePoolSize) {
        this.gravityConfig.setCorePoolSize(corePoolSize);
        if (this.gravityPool != null) {
            this.gravityPool.setCorePoolSize(corePoolSize);
        }
    }

    @Override
    public long getKeepAliveTimeMillis() {
        if (this.gravityPool != null) {
            return this.gravityPool.getKeepAliveTimeMillis();
        }
        return this.gravityConfig.getKeepAliveTimeMillis();
    }

    @Override
    public void setKeepAliveTimeMillis(long keepAliveTimeMillis) {
        this.gravityConfig.setKeepAliveTimeMillis(keepAliveTimeMillis);
        if (this.gravityPool != null) {
            this.gravityPool.setKeepAliveTimeMillis(keepAliveTimeMillis);
        }
    }

    @Override
    public int getMaximumPoolSize() {
        if (this.gravityPool != null) {
            return this.gravityPool.getMaximumPoolSize();
        }
        return this.gravityConfig.getMaximumPoolSize();
    }

    @Override
    public void setMaximumPoolSize(int maximumPoolSize) {
        this.gravityConfig.setMaximumPoolSize(maximumPoolSize);
        if (this.gravityPool != null) {
            this.gravityPool.setMaximumPoolSize(maximumPoolSize);
        }
    }

    @Override
    public int getQueueCapacity() {
        if (this.gravityPool != null) {
            return this.gravityPool.getQueueCapacity();
        }
        return this.gravityConfig.getQueueCapacity();
    }

    @Override
    public int getQueueRemainingCapacity() {
        if (this.gravityPool != null) {
            return this.gravityPool.getQueueRemainingCapacity();
        }
        return this.gravityConfig.getQueueCapacity();
    }

    @Override
    public int getQueueSize() {
        if (this.gravityPool != null) {
            return this.gravityPool.getQueueSize();
        }
        return 0;
    }

    @Override
    public boolean hasUdpReceiverFactory() {
        return this.udpReceiverFactory != null;
    }

    @Override
    public UdpReceiverFactory getUdpReceiverFactory() {
        return this.udpReceiverFactory;
    }

    protected <C extends Channel> C createChannel(ChannelFactory<C> channelFactory, String clientId) {
        Channel channel = null;
        if (clientId != null && (channel = (Channel)this.getChannel(channelFactory, clientId)) != null) {
            return (C)channel;
        }
        String clientType = GraniteContext.getCurrentInstance().getClientType();
        channel = (Channel)channelFactory.newChannel(UUIDUtil.randomUUID(), clientType);
        TimeChannel<Object> timeChannel = new TimeChannel<Object>(channel);
        int i = 0;
        while (this.channels.putIfAbsent(channel.getId(), timeChannel) != null) {
            if (i >= 10) {
                throw new RuntimeException("Could not find random new clientId after 10 iterations");
            }
            channel.destroy(false);
            channel = channelFactory.newChannel(UUIDUtil.randomUUID(), clientType);
            timeChannel = new TimeChannel<Channel>(channel);
            ++i;
        }
        String channelId = channel.getId();
        try {
            DistributedData gdd = this.graniteConfig.getDistributedDataFactory().getInstance();
            if (gdd != null) {
                log.debug("Saving channel id in distributed data: %s", channelId);
                gdd.addChannelId(channelId, channelFactory.getClass().getName());
            }
        }
        catch (Exception e) {
            log.error(e, "Could not add channel id in distributed data: %s", channelId);
        }
        this.access(channelId);
        return (C)channel;
    }

    @Override
    public <C extends Channel> C getChannel(ChannelFactory<C> channelFactory, String clientId) {
        if (clientId == null) {
            return null;
        }
        TimeChannel<Object> timeChannel = this.channels.get(clientId);
        if (timeChannel == null) {
            try {
                DistributedData gdd = this.graniteConfig.getDistributedDataFactory().getInstance();
                if (gdd != null && gdd.hasChannelId(clientId)) {
                    log.debug("Found channel id in distributed data: %s", clientId);
                    String channelFactoryClassName = gdd.getChannelFactoryClassName(clientId);
                    channelFactory = TypeUtil.newInstance(channelFactoryClassName, ChannelFactory.class);
                    String clientType = GraniteContext.getCurrentInstance().getClientType();
                    Object channel = channelFactory.newChannel(clientId, clientType);
                    timeChannel = new TimeChannel(channel);
                    if (this.channels.putIfAbsent(clientId, timeChannel) == null) {
                        for (CommandMessage subscription : gdd.getSubscriptions(clientId)) {
                            log.debug("Resubscribing channel: %s - %s", clientId, subscription);
                            this.handleSubscribeMessage(channelFactory, subscription, false);
                        }
                        this.access(clientId);
                    }
                }
            }
            catch (Exception e) {
                log.error(e, "Could not recreate channel/subscriptions from distributed data: %s", clientId);
            }
        }
        return timeChannel != null ? (C)timeChannel.getChannel() : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Channel removeChannel(String channelId, boolean timeout) {
        if (channelId == null) {
            return null;
        }
        try {
            DistributedData gdd = this.graniteConfig.getDistributedDataFactory().getInstance();
            if (gdd != null) {
                log.debug("Removing channel id from distributed data: %s", channelId);
                gdd.removeChannelId(channelId);
            }
        }
        catch (Exception e) {
            log.error(e, "Could not remove channel id from distributed data: %s", channelId);
        }
        TimeChannel<?> timeChannel = this.channels.get(channelId);
        Channel channel = null;
        if (timeChannel != null) {
            try {
                if (timeChannel.getTimerTask() != null) {
                    timeChannel.getTimerTask().cancel();
                }
            }
            catch (Exception e) {
                // empty catch block
            }
            channel = (Channel)timeChannel.getChannel();
            try {
                for (Subscription subscription : channel.getSubscriptions()) {
                    try {
                        Message message = subscription.getUnsubscribeMessage();
                        this.handleMessage(channel.getFactory(), message, true);
                    }
                    catch (Exception e) {
                        log.error(e, "Error while unsubscribing channel: %s from subscription: %s", channel, subscription);
                    }
                }
            }
            finally {
                this.channels.remove(channelId);
                channel.destroy(timeout);
            }
        }
        return channel;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean access(String channelId) {
        TimeChannel<?> timeChannel;
        if (channelId != null && (timeChannel = this.channels.get(channelId)) != null) {
            TimeChannel<?> timeChannel2 = timeChannel;
            synchronized (timeChannel2) {
                TimerTask timerTask = timeChannel.getTimerTask();
                if (timerTask != null) {
                    log.debug("Canceling TimerTask: %s", timerTask);
                    timerTask.cancel();
                    timeChannel.setTimerTask(null);
                }
                timerTask = new ChannelTimerTask(this, channelId);
                timeChannel.setTimerTask(timerTask);
                long timeout = this.gravityConfig.getChannelIdleTimeoutMillis();
                log.debug("Scheduling TimerTask: %s for %s ms.", timerTask, timeout);
                this.channelsTimer.schedule(timerTask, timeout);
                return true;
            }
        }
        return false;
    }

    @Override
    public void execute(AsyncChannelRunner runner) {
        if (this.gravityPool == null) {
            runner.reset();
            throw new NullPointerException("Gravity not started or pool disabled");
        }
        this.gravityPool.execute(runner);
    }

    @Override
    public boolean cancel(AsyncChannelRunner runner) {
        if (this.gravityPool == null) {
            runner.reset();
            throw new NullPointerException("Gravity not started or pool disabled");
        }
        return this.gravityPool.remove(runner);
    }

    public Message handleMessage(ChannelFactory<?> channelFactory, Message message) {
        return this.handleMessage(channelFactory, message, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Message handleMessage(ChannelFactory<?> channelFactory, Message message, boolean skipInterceptor) {
        interceptor = null;
        if (!skipInterceptor) {
            interceptor = GraniteContext.getCurrentInstance().getGraniteConfig().getAmf3MessageInterceptor();
        }
        reply = null;
        publish = false;
        try {
            if (interceptor != null) {
                interceptor.before(message);
            }
            if (!(message instanceof CommandMessage)) ** GOTO lbl51
            command = (CommandMessage)message;
            switch (command.getOperation()) {
                case 8: 
                case 9: {
                    var8_9 = this.handleSecurityMessage(command);
                    if (interceptor != null) {
                        interceptor.after(message, reply);
                    }
                    return var8_9;
                }
                case 5: {
                    var8_10 = this.handlePingMessage(channelFactory, command);
                    if (interceptor != null) {
                        interceptor.after(message, reply);
                    }
                    return var8_10;
                }
                case 20: {
                    var8_11 = this.handleConnectMessage(channelFactory, command);
                    if (interceptor != null) {
                        interceptor.after(message, reply);
                    }
                    return var8_11;
                }
            }
        }
        catch (Throwable var9_16) {
            if (interceptor != null) {
                interceptor.after(message, reply);
            }
            throw var9_16;
        }
        {
            case 21: {
                var8_12 = this.handleDisconnectMessage(channelFactory, command);
                if (interceptor != null) {
                    interceptor.after(message, reply);
                }
                return var8_12;
            }
            case 0: {
                var8_13 = this.handleSubscribeMessage(channelFactory, command);
                if (interceptor != null) {
                    interceptor.after(message, reply);
                }
                return var8_13;
            }
            case 1: {
                var8_14 = this.handleUnsubscribeMessage(channelFactory, command);
                if (interceptor != null) {
                    interceptor.after(message, reply);
                }
                return var8_14;
            }
        }
        throw new UnsupportedOperationException("Unsupported command operation: " + command);
lbl51:
        // 1 sources

        reply = this.handlePublishMessage(channelFactory, (AsyncMessage)message);
        publish = true;
        if (interceptor != null) {
            interceptor.after(message, reply);
        }
        if (reply != null && (context = GraniteContext.getCurrentInstance()).getSessionId() != null) {
            reply.setHeader("org.granite.sessionId", context.getSessionId());
            if (publish && context instanceof ServletGraniteContext && ((ServletGraniteContext)context).getSession(false) != null) {
                serverTime = new Date().getTime();
                ((ServletGraniteContext)context).getSession().setAttribute("org.granite.session.lastAccessedTime", (Object)serverTime);
                reply.setHeader("org.granite.time", serverTime);
                reply.setHeader("org.granite.sessionExp", ((ServletGraniteContext)context).getSession().getMaxInactiveInterval());
            }
        }
        return reply;
    }

    @Override
    public GraniteContext initThread(String sessionId, String clientType) {
        GraniteContext context = GraniteContext.getCurrentInstance();
        if (context == null) {
            context = SimpleGraniteContext.createThreadInstance(this.graniteConfig, this.servicesConfig, sessionId, this.applicationMap, clientType);
        }
        return context;
    }

    @Override
    public void releaseThread() {
        GraniteContext.release();
    }

    @Override
    public Message publishMessage(AsyncMessage message) {
        return this.publishMessage(this.serverChannel, message);
    }

    @Override
    public Message publishMessage(Channel fromChannel, AsyncMessage message) {
        this.initThread(null, fromChannel != null ? fromChannel.getClientType() : this.serverChannel.getClientType());
        return this.handlePublishMessage(null, message, fromChannel != null ? fromChannel : this.serverChannel);
    }

    private Message handlePingMessage(ChannelFactory<?> channelFactory, CommandMessage message) {
        Object channel = this.createChannel(channelFactory, (String)message.getClientId());
        AcknowledgeMessage reply = new AcknowledgeMessage(message);
        reply.setClientId(channel.getId());
        HashMap<String, Long> advice = new HashMap<String, Long>();
        advice.put("reconnect-interval-ms", this.gravityConfig.getReconnectIntervalMillis());
        advice.put("reconnect-max-attempts", Long.valueOf(this.gravityConfig.getReconnectMaxAttempts()));
        reply.setBody(advice);
        reply.setDestination(message.getDestination());
        log.debug("handshake.handle: reply=%s", reply);
        return reply;
    }

    private Message handleSecurityMessage(CommandMessage message) {
        GraniteConfig config = GraniteContext.getCurrentInstance().getGraniteConfig();
        AcknowledgeMessage response = null;
        if (!config.hasSecurityService()) {
            log.warn("Ignored security operation (no security settings in granite-config.xml): %s", message);
        } else if (!config.getSecurityService().acceptsContext()) {
            log.info("Ignored security operation (security service does not handle this kind of granite context)", message);
        } else {
            SecurityService securityService = config.getSecurityService();
            try {
                if (message.isLoginOperation()) {
                    securityService.login(message.getBody(), (String)message.getHeader("DSCredentialsCharset"));
                } else {
                    securityService.logout();
                }
            }
            catch (Exception e) {
                if (e instanceof SecurityServiceException) {
                    log.debug(e, "Could not process security operation: %s", message);
                } else {
                    log.error(e, "Could not process security operation: %s", message);
                }
                response = new ErrorMessage(message, e, true);
            }
        }
        if (response == null) {
            response = new AcknowledgeMessage(message, true);
            if (message.isSecurityOperation()) {
                response.setBody("success");
            }
        }
        return response;
    }

    private Message handleConnectMessage(ChannelFactory<?> channelFactory, CommandMessage message) {
        Object client = this.getChannel(channelFactory, (String)message.getClientId());
        if (client == null) {
            return this.handleUnknownClientMessage(message);
        }
        return null;
    }

    private Message handleDisconnectMessage(ChannelFactory<?> channelFactory, CommandMessage message) {
        Object client = this.getChannel(channelFactory, (String)message.getClientId());
        if (client == null) {
            return this.handleUnknownClientMessage(message);
        }
        this.removeChannel(client.getId(), false);
        AcknowledgeMessage reply = new AcknowledgeMessage(message);
        reply.setDestination(message.getDestination());
        reply.setClientId(client.getId());
        return reply;
    }

    private Message handleSubscribeMessage(ChannelFactory<?> channelFactory, CommandMessage message) {
        return this.handleSubscribeMessage(channelFactory, message, true);
    }

    private Message handleSubscribeMessage(final ChannelFactory<?> channelFactory, final CommandMessage message, final boolean saveMessageInSession) {
        GraniteContext context = GraniteContext.getCurrentInstance();
        final Destination destination = context.getServicesConfig().findDestinationById(message.getMessageRefType(), message.getDestination());
        if (destination == null) {
            return this.getInvalidDestinationError(message);
        }
        GravityInvocationContext invocationContext = new GravityInvocationContext(message, destination){

            @Override
            public Object invoke() throws Exception {
                DistributedData gdd;
                Object channel = DefaultGravity.this.getChannel(channelFactory, (String)message.getClientId());
                if (channel == null) {
                    return DefaultGravity.this.handleUnknownClientMessage(message);
                }
                String subscriptionId = (String)message.getHeader("DSDstClientId");
                if (subscriptionId == null) {
                    subscriptionId = UUIDUtil.randomUUID();
                    message.setHeader("DSDstClientId", subscriptionId);
                }
                if ((gdd = DefaultGravity.this.graniteConfig.getDistributedDataFactory().getInstance()) != null) {
                    if (!gdd.hasChannelId(channel.getId())) {
                        gdd.addChannelId(channel.getId(), channel.getFactory().getClass().getName());
                    }
                    if (Boolean.TRUE.toString().equals(destination.getProperties().get("session-selector"))) {
                        String selector = gdd.getDestinationSelector(destination.getId());
                        log.debug("Session selector found: %s", selector);
                        if (selector != null) {
                            message.setHeader("DSSelector", selector);
                        }
                    }
                }
                ServiceAdapter adapter = DefaultGravity.this.adapterFactory.getServiceAdapter(message);
                AsyncMessage reply = (AsyncMessage)adapter.manage((Channel)channel, message);
                DefaultGravity.this.postManage((Channel)channel);
                if (saveMessageInSession && !(reply instanceof ErrorMessage)) {
                    try {
                        if (gdd != null) {
                            log.debug("Saving new subscription message for channel: %s - %s", channel.getId(), message);
                            gdd.addSubcription(channel.getId(), message);
                        }
                    }
                    catch (Exception e) {
                        log.error(e, "Could not add subscription in distributed data: %s - %s", channel.getId(), subscriptionId);
                    }
                }
                reply.setDestination(message.getDestination());
                reply.setClientId(channel.getId());
                reply.getHeaders().putAll(message.getHeaders());
                if (gdd != null && message.getDestination() != null) {
                    gdd.setDestinationClientId(message.getDestination(), channel.getId());
                    gdd.setDestinationSubscriptionId(message.getDestination(), subscriptionId);
                }
                return reply;
            }
        };
        if (destination.getSecurizer() instanceof GravityDestinationSecurizer) {
            try {
                ((GravityDestinationSecurizer)destination.getSecurizer()).canSubscribe(invocationContext);
            }
            catch (Exception e) {
                return new ErrorMessage((Message)message, e);
            }
        }
        GraniteConfig config = context.getGraniteConfig();
        try {
            if (config.hasSecurityService() && config.getSecurityService().acceptsContext()) {
                return (Message)config.getSecurityService().authorize(invocationContext);
            }
            return (Message)invocationContext.invoke();
        }
        catch (Exception e) {
            return new ErrorMessage(message, e, true);
        }
    }

    private Message handleUnsubscribeMessage(ChannelFactory<?> channelFactory, CommandMessage message) {
        Object channel = this.getChannel(channelFactory, (String)message.getClientId());
        if (channel == null) {
            return this.handleUnknownClientMessage(message);
        }
        AcknowledgeMessage reply = null;
        ServiceAdapter adapter = this.adapterFactory.getServiceAdapter(message);
        reply = (AcknowledgeMessage)adapter.manage((Channel)channel, message);
        this.postManage((Channel)channel);
        if (!(reply instanceof ErrorMessage)) {
            try {
                DistributedData gdd = this.graniteConfig.getDistributedDataFactory().getInstance();
                if (gdd != null) {
                    String subscriptionId = (String)message.getHeader("DSDstClientId");
                    log.debug("Removing subscription message from channel info: %s - %s", channel.getId(), subscriptionId);
                    gdd.removeSubcription(channel.getId(), subscriptionId);
                }
            }
            catch (Exception e) {
                log.error(e, "Could not remove subscription from distributed data: %s - %s", channel.getId(), message.getHeader("DSDstClientId"));
            }
        }
        reply.setDestination(message.getDestination());
        reply.setClientId(channel.getId());
        reply.getHeaders().putAll(message.getHeaders());
        return reply;
    }

    protected void postManage(Channel channel) {
    }

    private Message handlePublishMessage(ChannelFactory<?> channelFactory, AsyncMessage message) {
        return this.handlePublishMessage(channelFactory, message, null);
    }

    private Message handlePublishMessage(final ChannelFactory<?> channelFactory, final AsyncMessage message, final Channel channel) {
        GraniteContext context = GraniteContext.getCurrentInstance();
        Destination destination = context.getServicesConfig().findDestinationById(message.getClass().getName(), message.getDestination());
        if (destination == null) {
            return this.getInvalidDestinationError(message);
        }
        if (message.getMessageId() == null) {
            message.setMessageId(UUIDUtil.randomUUID());
        }
        message.setTimestamp(System.currentTimeMillis());
        if (channel != null) {
            message.setClientId(channel.getId());
        }
        GravityInvocationContext invocationContext = new GravityInvocationContext(message, destination){

            @Override
            public Object invoke() throws Exception {
                Channel fromChannel = channel;
                if (fromChannel == null) {
                    fromChannel = DefaultGravity.this.getChannel(channelFactory, (String)message.getClientId());
                }
                if (fromChannel == null) {
                    return DefaultGravity.this.handleUnknownClientMessage(message);
                }
                ServiceAdapter adapter = DefaultGravity.this.adapterFactory.getServiceAdapter(message);
                AsyncMessage reply = (AsyncMessage)adapter.invoke(fromChannel, message);
                reply.setDestination(message.getDestination());
                reply.setClientId(fromChannel.getId());
                return reply;
            }
        };
        if (destination.getSecurizer() instanceof GravityDestinationSecurizer) {
            try {
                ((GravityDestinationSecurizer)destination.getSecurizer()).canPublish(invocationContext);
            }
            catch (Exception e) {
                return new ErrorMessage(message, e, true);
            }
        }
        GraniteConfig config = context.getGraniteConfig();
        try {
            if (config.hasSecurityService() && config.getSecurityService().acceptsContext()) {
                return (Message)config.getSecurityService().authorize(invocationContext);
            }
            return (Message)invocationContext.invoke();
        }
        catch (Exception e) {
            return new ErrorMessage(message, e, true);
        }
    }

    private Message handleUnknownClientMessage(Message message) {
        ErrorMessage reply = new ErrorMessage(message, true);
        reply.setFaultCode("Server.Call.UnknownClient");
        reply.setFaultString("Unknown client");
        return reply;
    }

    private ErrorMessage getInvalidDestinationError(Message message) {
        String messageType = message.getClass().getName();
        if (message instanceof CommandMessage) {
            messageType = messageType + '[' + ((CommandMessage)message).getMessageRefType() + ']';
        }
        ErrorMessage reply = new ErrorMessage(message, true);
        reply.setFaultCode("Server.Messaging.InvalidDestination");
        reply.setFaultString("No configured destination for id: " + message.getDestination() + " and message type: " + messageType);
        return reply;
    }

    private static class ServerChannel
    extends AbstractChannel
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public ServerChannel(Gravity gravity, String channelId, ChannelFactory<ServerChannel> factory, String clientType) {
            super(gravity, channelId, factory, clientType);
        }

        @Override
        public Gravity getGravity() {
            return this.gravity;
        }

        @Override
        public void close() {
        }

        @Override
        public void receive(AsyncMessage message) throws MessageReceivingException {
        }

        @Override
        protected boolean hasAsyncHttpContext() {
            return false;
        }

        @Override
        protected AsyncHttpContext acquireAsyncHttpContext() {
            return null;
        }

        @Override
        protected void releaseAsyncHttpContext(AsyncHttpContext context) {
        }
    }
}

