/*
 * Decompiled with CFR 0.152.
 */
package org.granite.client.messaging.channel.amf;

import flex.messaging.messages.AcknowledgeMessage;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.CommandMessage;
import flex.messaging.messages.Message;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.granite.client.messaging.Consumer;
import org.granite.client.messaging.ResponseListener;
import org.granite.client.messaging.channel.AsyncToken;
import org.granite.client.messaging.channel.MessagingChannel;
import org.granite.client.messaging.channel.ResponseMessageFuture;
import org.granite.client.messaging.channel.amf.AbstractAMFChannel;
import org.granite.client.messaging.codec.MessagingCodec;
import org.granite.client.messaging.messages.RequestMessage;
import org.granite.client.messaging.messages.ResponseMessage;
import org.granite.client.messaging.messages.requests.DisconnectMessage;
import org.granite.client.messaging.messages.responses.AbstractResponseMessage;
import org.granite.client.messaging.messages.responses.ResultMessage;
import org.granite.client.messaging.transport.DefaultTransportMessage;
import org.granite.client.messaging.transport.Transport;
import org.granite.client.messaging.transport.TransportMessage;
import org.granite.logging.Logger;
import org.granite.util.UUIDUtil;

public class AbstractAMFMessagingChannel
extends AbstractAMFChannel
implements MessagingChannel {
    private static final Logger log = Logger.getLogger(AbstractAMFMessagingChannel.class);
    protected final MessagingCodec<Message[]> codec;
    protected String sessionId = null;
    protected final ConcurrentMap<String, Consumer> consumersMap = new ConcurrentHashMap<String, Consumer>();
    protected final AtomicReference<String> connectMessageId = new AtomicReference<Object>(null);
    protected final AtomicReference<ReconnectTimerTask> reconnectTimerTask = new AtomicReference();
    protected volatile long reconnectIntervalMillis = TimeUnit.SECONDS.toMillis(30L);
    protected volatile long reconnectMaxAttempts = 60L;
    protected volatile long reconnectAttempts = 0L;

    protected AbstractAMFMessagingChannel(MessagingCodec<Message[]> codec, Transport transport, String id, URI uri) {
        super(transport, id, uri, 1);
        this.codec = codec;
    }

    @Override
    public void setSessionId(String sessionId) {
        if (sessionId == null && this.sessionId != null || sessionId != null && !sessionId.equals(this.sessionId)) {
            this.sessionId = sessionId;
            log.info("Messaging channel sessionId %s", sessionId);
        }
    }

    protected boolean connect() {
        this.cancelReconnectTimerTask();
        if (this.consumersMap.isEmpty()) {
            return false;
        }
        String id = UUIDUtil.randomUUID();
        if (!this.connectMessageId.compareAndSet(null, id)) {
            return false;
        }
        log.debug("Connecting channel with clientId %s", this.clientId);
        CommandMessage connectMessage = new CommandMessage();
        connectMessage.setOperation(20);
        connectMessage.setMessageId(id);
        connectMessage.setTimestamp(System.currentTimeMillis());
        connectMessage.setClientId(this.clientId);
        try {
            this.transport.send(this, new DefaultTransportMessage<Message[]>(id, true, this.clientId, this.sessionId, new Message[]{connectMessage}, this.codec));
            return true;
        }
        catch (Exception e) {
            this.connectMessageId.set(null);
            this.scheduleReconnectTimerTask();
            return false;
        }
    }

    @Override
    public void addConsumer(Consumer consumer) {
        this.consumersMap.putIfAbsent(consumer.getSubscriptionId(), consumer);
        this.connect();
    }

    @Override
    public boolean removeConsumer(Consumer consumer) {
        return this.consumersMap.remove(consumer.getSubscriptionId()) != null;
    }

    @Override
    public synchronized ResponseMessageFuture disconnect(ResponseListener ... listeners) {
        this.cancelReconnectTimerTask();
        this.connectMessageId.set(null);
        this.reconnectAttempts = 0L;
        for (Consumer consumer : this.consumersMap.values()) {
            consumer.onDisconnect();
        }
        this.consumersMap.clear();
        return this.send(new DisconnectMessage(this.clientId), listeners);
    }

    @Override
    protected TransportMessage createTransportMessage(AsyncToken token) throws UnsupportedEncodingException {
        Message[] messages = this.convertToAmf(token.getRequest());
        return new DefaultTransportMessage<Message[]>(token.getId(), false, this.clientId, this.sessionId, messages, this.codec);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected ResponseMessage decodeResponse(InputStream is) throws IOException {
        boolean reconnect = true;
        try {
            if (is.available() > 0) {
                Message[] messages = this.codec.decode(is);
                if (messages.length > 0 && messages[0] instanceof AcknowledgeMessage) {
                    RequestMessage request;
                    reconnect = false;
                    AbstractResponseMessage response = this.convertFromAmf((AcknowledgeMessage)messages[0]);
                    if (response instanceof ResultMessage && (request = this.getRequest(response.getCorrelationId())) != null) {
                        ResultMessage result = (ResultMessage)response;
                        switch (request.getType()) {
                            case PING: {
                                Object reconnectMaxAttempts;
                                if (!(messages[0].getBody() instanceof Map)) break;
                                Map advices = (Map)messages[0].getBody();
                                Object reconnectIntervalMillis = advices.get("reconnect-interval-ms");
                                if (reconnectIntervalMillis instanceof Number) {
                                    this.reconnectIntervalMillis = ((Number)reconnectIntervalMillis).longValue();
                                }
                                if (!((reconnectMaxAttempts = advices.get("reconnect-max-attempts")) instanceof Number)) break;
                                this.reconnectMaxAttempts = ((Number)reconnectMaxAttempts).longValue();
                                break;
                            }
                            case SUBSCRIBE: {
                                result.setResult(messages[0].getHeader("DSDstClientId"));
                                break;
                            }
                        }
                    }
                    AbstractResponseMessage current = response;
                    for (int i = 1; i < messages.length; ++i) {
                        if (!(messages[i] instanceof AcknowledgeMessage)) {
                            throw new RuntimeException("Message should be an AcknowledgeMessage: " + messages[i]);
                        }
                        AbstractResponseMessage next = this.convertFromAmf((AcknowledgeMessage)messages[i]);
                        current.setNext(next);
                        current = next;
                    }
                    AbstractResponseMessage i = response;
                    return i;
                }
                for (Message message : messages) {
                    if (!(message instanceof AsyncMessage)) {
                        throw new RuntimeException("Message should be an AsyncMessage: " + message);
                    }
                    String subscriptionId = (String)message.getHeader("DSDstClientId");
                    Consumer consumer = (Consumer)this.consumersMap.get(subscriptionId);
                    if (consumer != null) {
                        consumer.onMessage(this.convertFromAmf((AsyncMessage)message));
                        continue;
                    }
                    log.warn("No consumer for subscriptionId: %s", subscriptionId);
                }
            }
        }
        finally {
            if (reconnect) {
                this.connectMessageId.set(null);
                this.connect();
            }
        }
        return null;
    }

    @Override
    public void onError(TransportMessage message, Exception e) {
        super.onError(message, e);
        if (message != null && this.connectMessageId.compareAndSet(message.getId(), null)) {
            this.scheduleReconnectTimerTask();
        }
    }

    protected void cancelReconnectTimerTask() {
        ReconnectTimerTask task = this.reconnectTimerTask.getAndSet(null);
        if (task != null && task.cancel()) {
            this.reconnectAttempts = 0L;
        }
    }

    protected void scheduleReconnectTimerTask() {
        ReconnectTimerTask task = new ReconnectTimerTask();
        ReconnectTimerTask previousTask = this.reconnectTimerTask.getAndSet(task);
        if (previousTask != null) {
            previousTask.cancel();
        }
        if (this.reconnectAttempts < this.reconnectMaxAttempts) {
            ++this.reconnectAttempts;
            this.schedule(task, this.reconnectIntervalMillis);
        }
    }

    class ReconnectTimerTask
    extends TimerTask {
        ReconnectTimerTask() {
        }

        @Override
        public void run() {
            AbstractAMFMessagingChannel.this.connect();
        }
    }
}

