/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.broker.region;

import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import java.io.IOException;
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.Destination;
import org.activemq.broker.region.DestinationStatistics;
import org.activemq.broker.region.DurableTopicSubscription;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
import org.activemq.broker.region.policy.DispatchPolicy;
import org.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.Message;
import org.activemq.command.MessageAck;
import org.activemq.command.MessageId;
import org.activemq.command.SubscriptionInfo;
import org.activemq.filter.MessageEvaluationContext;
import org.activemq.memory.UsageManager;
import org.activemq.store.MessageRecoveryListener;
import org.activemq.store.MessageStore;
import org.activemq.store.TopicMessageStore;
import org.activemq.thread.TaskRunnerFactory;
import org.activemq.thread.Valve;
import org.activemq.transaction.Synchronization;
import org.activemq.util.SubscriptionKey;

public class Topic
implements Destination {
    protected final ActiveMQDestination destination;
    protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
    protected final Valve dispatchValve = new Valve(true);
    protected final TopicMessageStore store;
    protected final UsageManager usageManager;
    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();

    public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) {
        this.destination = destination;
        this.store = store;
        this.usageManager = memoryManager;
        this.destinationStatistics.setParent(parentStats);
    }

    public boolean lock(MessageReference node, Subscription sub) {
        return true;
    }

    public void addSubscription(ConnectionContext context, Subscription sub) throws Throwable {
        this.destinationStatistics.getConsumers().increment();
        sub.add(context, this);
        if (sub.getConsumerInfo().isDurable()) {
            this.recover((DurableTopicSubscription)sub, true);
        } else {
            if (sub.getConsumerInfo().isRetroactive()) {
                this.subscriptionRecoveryPolicy.recover(context, sub);
            }
            this.consumers.add((Object)sub);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recover(final DurableTopicSubscription sub, boolean initialActivation) throws Throwable {
        this.dispatchValve.turnOff();
        try {
            if (initialActivation) {
                this.consumers.add((Object)sub);
            }
            if (this.store != null) {
                String s1;
                String clientId = sub.getClientId();
                String subscriptionName = sub.getSubscriptionName();
                String selector = sub.getConsumerInfo().getSelector();
                SubscriptionInfo info = this.store.lookupSubscription(clientId, subscriptionName);
                if (info != null && ((s1 = info.getSelector()) == null ^ selector == null || s1 != null && !s1.equals(selector))) {
                    this.store.deleteSubscription(clientId, subscriptionName);
                    info = null;
                }
                if (info == null) {
                    this.store.addSubsciption(clientId, subscriptionName, selector, sub.getConsumerInfo().isRetroactive());
                }
                if (sub.isRecovered()) {
                    final MessageEvaluationContext msgContext = new MessageEvaluationContext();
                    msgContext.setDestination(this.destination);
                    this.store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener(){

                        public void recoverMessage(Message message) throws Throwable {
                            message.setRegionDestination(Topic.this);
                            try {
                                msgContext.setMessageReference(message);
                                if (sub.matches(message, msgContext)) {
                                    sub.add(message);
                                }
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                            catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }
        finally {
            this.dispatchValve.turnOn();
        }
    }

    public void removeSubscription(ConnectionContext context, Subscription sub) throws Throwable {
        this.destinationStatistics.getConsumers().decrement();
        this.consumers.remove((Object)sub);
        sub.remove(context, this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void send(final ConnectionContext context, final Message message) throws Throwable {
        if (context.isProducerFlowControl()) {
            this.usageManager.waitForSpace();
        }
        message.setRegionDestination(this);
        if (this.store != null && message.isPersistent()) {
            this.store.addMessage(context, message);
        }
        message.incrementReferenceCount();
        try {
            if (context.isInTransaction()) {
                context.getTransaction().addSynchronization(new Synchronization(){

                    public void afterCommit() throws Throwable {
                        Topic.this.dispatch(context, message);
                    }
                });
            } else {
                this.dispatch(context, message);
            }
        }
        finally {
            message.decrementReferenceCount();
        }
    }

    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
        if (this.store != null) {
            this.store.deleteSubscription(key.clientId, key.subscriptionName);
        }
    }

    public String toString() {
        return "Topic: destination=" + this.destination.getPhysicalName() + ", subscriptions=" + this.consumers.size();
    }

    public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
        if (this.store != null && node.isPersistent()) {
            DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
            this.store.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), ack.getLastMessageId());
        }
    }

    public void dispose(ConnectionContext context) throws IOException {
        if (this.store != null) {
            this.store.removeAllMessages(context);
        }
    }

    public void gc() {
    }

    public Message loadMessage(MessageId messageId) throws IOException {
        return this.store.getMessage(messageId);
    }

    public void start() throws Exception {
        this.subscriptionRecoveryPolicy.start();
    }

    public void stop() throws Exception {
        this.subscriptionRecoveryPolicy.stop();
    }

    public UsageManager getUsageManager() {
        return this.usageManager;
    }

    public DestinationStatistics getDestinationStatistics() {
        return this.destinationStatistics;
    }

    public ActiveMQDestination getActiveMQDestination() {
        return this.destination;
    }

    public DispatchPolicy getDispatchPolicy() {
        return this.dispatchPolicy;
    }

    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
        this.dispatchPolicy = dispatchPolicy;
    }

    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
        return this.subscriptionRecoveryPolicy;
    }

    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
        this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void dispatch(ConnectionContext context, Message message) throws Throwable {
        this.destinationStatistics.getEnqueues().increment();
        this.dispatchValve.increment();
        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
        try {
            this.subscriptionRecoveryPolicy.add(context, message);
            if (this.consumers.isEmpty()) {
                return;
            }
            msgContext.setDestination(this.destination);
            msgContext.setMessageReference(message);
            this.dispatchPolicy.dispatch(context, message, msgContext, this.consumers);
        }
        finally {
            msgContext.clear();
            this.dispatchValve.decrement();
        }
    }

    public MessageStore getMessageStore() {
        return this.store;
    }
}

