/*
 * Decompiled with CFR 0.152.
 */
package org.granite.client.messaging;

import java.util.concurrent.ConcurrentHashMap;
import org.granite.client.messaging.AbstractTopicAgent;
import org.granite.client.messaging.ResponseListener;
import org.granite.client.messaging.ResultIssuesResponseListener;
import org.granite.client.messaging.TopicMessageListener;
import org.granite.client.messaging.channel.MessagingChannel;
import org.granite.client.messaging.channel.ResponseMessageFuture;
import org.granite.client.messaging.events.IssueEvent;
import org.granite.client.messaging.events.ResultEvent;
import org.granite.client.messaging.events.TopicMessageEvent;
import org.granite.client.messaging.messages.push.TopicMessage;
import org.granite.client.messaging.messages.requests.SubscribeMessage;
import org.granite.client.messaging.messages.requests.UnsubscribeMessage;
import org.granite.logging.Logger;

public class Consumer
extends AbstractTopicAgent {
    private static final Logger log = Logger.getLogger(Consumer.class);
    private final ConcurrentHashMap<TopicMessageListener, Boolean> listeners = new ConcurrentHashMap();
    private String subscriptionId = null;
    private String selector = null;

    public Consumer(MessagingChannel channel, String destination, String topic) {
        super(channel, destination, topic);
    }

    public String getSelector() {
        return this.selector;
    }

    public void setSelector(String selector) {
        this.selector = selector;
    }

    public boolean isSubscribed() {
        return this.subscriptionId != null;
    }

    public String getSubscriptionId() {
        return this.subscriptionId;
    }

    public ResponseMessageFuture subscribe(ResponseListener ... listeners) {
        SubscribeMessage subscribeMessage = new SubscribeMessage(this.destination, this.topic, this.selector);
        subscribeMessage.getHeaders().putAll(this.defaultHeaders);
        final Consumer consumer = this;
        ResultIssuesResponseListener listener = new ResultIssuesResponseListener(){

            @Override
            public void onResult(ResultEvent event) {
                Consumer.this.subscriptionId = (String)event.getResult();
                Consumer.this.channel.addConsumer(consumer);
            }

            @Override
            public void onIssue(IssueEvent event) {
                log.error("Subscription failed %s: %s", consumer, event);
            }
        };
        if (listeners == null || listeners.length == 0) {
            listeners = new ResponseListener[]{listener};
        } else {
            ResponseListener[] tmp = new ResponseListener[listeners.length + 1];
            System.arraycopy(listeners, 0, tmp, 0, listeners.length);
            tmp[listeners.length] = listener;
            listeners = tmp;
        }
        return this.channel.send(subscribeMessage, listeners);
    }

    public ResponseMessageFuture unsubscribe(ResponseListener ... listeners) {
        UnsubscribeMessage unsubscribeMessage = new UnsubscribeMessage(this.destination, this.topic, this.subscriptionId);
        unsubscribeMessage.getHeaders().putAll(this.defaultHeaders);
        final Consumer consumer = this;
        ResultIssuesResponseListener listener = new ResultIssuesResponseListener(){

            @Override
            public void onResult(ResultEvent event) {
                Consumer.this.channel.removeConsumer(consumer);
                Consumer.this.subscriptionId = null;
            }

            @Override
            public void onIssue(IssueEvent event) {
                log.error("Unsubscription failed %s: %s", consumer, event);
            }
        };
        if (listeners == null || listeners.length == 0) {
            listeners = new ResponseListener[]{listener};
        } else {
            ResponseListener[] tmp = new ResponseListener[listeners.length + 1];
            System.arraycopy(listeners, 0, tmp, 0, listeners.length);
            tmp[listeners.length] = listener;
            listeners = tmp;
        }
        return this.channel.send(unsubscribeMessage, listeners);
    }

    public void addMessageListener(TopicMessageListener listener) {
        this.listeners.putIfAbsent(listener, Boolean.TRUE);
    }

    public boolean removeMessageListener(TopicMessageListener listener) {
        return this.listeners.remove(listener) != null;
    }

    public void onDisconnect() {
        this.subscriptionId = null;
    }

    public void onMessage(TopicMessage message) {
        for (TopicMessageListener listener : this.listeners.keySet()) {
            try {
                listener.onMessage(new TopicMessageEvent(this, message));
            }
            catch (Exception e) {
                log.error(e, "Consumer listener threw an exception: ", listener);
            }
        }
    }

    public String toString() {
        return this.getClass().getName() + " {subscriptionId=" + this.subscriptionId + ", destination=" + this.destination + ", topic=" + this.topic + ", selector=" + this.selector + "}";
    }
}

