/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.messaging.core.impl;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import org.jboss.jms.server.MessagingTimeoutFactory;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Channel;
import org.jboss.messaging.core.contract.Delivery;
import org.jboss.messaging.core.contract.DeliveryObserver;
import org.jboss.messaging.core.contract.Distributor;
import org.jboss.messaging.core.contract.Filter;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.PersistenceManager;
import org.jboss.messaging.core.impl.SimpleDelivery;
import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.messaging.core.impl.tx.TransactionException;
import org.jboss.messaging.core.impl.tx.TxCallback;
import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
import org.jboss.util.timeout.Timeout;
import org.jboss.util.timeout.TimeoutTarget;

public abstract class ChannelSupport
implements Channel {
    private static final Logger log = Logger.getLogger(ChannelSupport.class);
    private boolean trace = log.isTraceEnabled();
    protected long channelID;
    protected Distributor distributor;
    protected boolean receiversReady;
    protected PriorityLinkedList messageRefs;
    protected boolean recoverable;
    protected PersistenceManager pm;
    protected Object lock;
    protected volatile boolean active;
    protected SynchronizedInt deliveringCount;
    protected Set scheduledDeliveries;
    protected int maxSize;
    protected SynchronizedInt messagesAdded;

    protected ChannelSupport(long channelID, PersistenceManager pm, boolean recoverable, int maxSize) {
        if (this.trace) {
            log.trace("creating " + (pm != null ? "recoverable " : "non-recoverable ") + "channel[" + channelID + "]");
        }
        this.pm = pm;
        this.channelID = channelID;
        this.recoverable = recoverable;
        this.messageRefs = new BasicPriorityLinkedList(10);
        this.lock = new Object();
        this.deliveringCount = new SynchronizedInt(0);
        this.scheduledDeliveries = new HashSet();
        this.maxSize = maxSize;
        this.messagesAdded = new SynchronizedInt(0);
    }

    public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx) {
        if (!this.isActive()) {
            if (this.trace) {
                log.trace(this + " is not active, returning null delivery for " + ref);
            }
            return null;
        }
        this.checkClosed();
        return this.handleInternal(sender, ref, tx, true);
    }

    public void acknowledge(Delivery d, Transaction tx) throws Throwable {
        if (this.trace) {
            log.trace("acknowledging " + d + (tx == null ? " non-transactionally" : " transactionally in " + tx));
        }
        this.acknowledgeInternal(d, tx, true);
    }

    public void cancel(Delivery del) throws Throwable {
        MessageReference ref = del.getReference();
        if (ref.getMessage().isReliable()) {
            this.pm.updateDeliveryCount(this.channelID, ref);
        }
        if (!del.isRecovered()) {
            this.deliveringCount.decrement();
        }
        if (!this.checkAndSchedule(ref)) {
            this.cancelInternal(ref);
        }
    }

    public long getChannelID() {
        return this.channelID;
    }

    public boolean isRecoverable() {
        return this.recoverable;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List browse(Filter filter) {
        if (this.trace) {
            log.trace(this + " browse" + (filter == null ? "" : ", filter = " + filter));
        }
        Object object = this.lock;
        synchronized (object) {
            List references = this.undelivered(filter);
            ArrayList<Message> messages = new ArrayList<Message>(references.size());
            for (MessageReference ref : references) {
                messages.add(ref.getMessage());
            }
            return messages;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deliver() {
        this.checkClosed();
        Object object = this.lock;
        synchronized (object) {
            if (this.distributor != null && this.distributor.getNumberOfReceivers() > 0) {
                this.setReceiversReady(true);
                this.deliverInternal();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        Object object = this.lock;
        synchronized (object) {
            if (this.distributor != null) {
                this.distributor.clear();
                this.distributor = null;
            }
            this.clearAllScheduledDeliveries();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeAllReferences() throws Throwable {
        Object object = this.lock;
        synchronized (object) {
            MessageReference ref;
            if (this.deliveringCount.get() > 0) {
                throw new IllegalStateException("Cannot remove references while deliveries are in progress, there are " + this.deliveringCount.get());
            }
            log.trace(this + " removing all references, there are " + this.messageRefs.size());
            while ((ref = this.removeFirstInMemory()) != null) {
                log.trace("Removing ref " + ref);
                SimpleDelivery del = new SimpleDelivery(this, ref);
                del.acknowledge(null);
            }
            this.deliveringCount.set(0);
            log.trace(this + " done removing all references, there are " + this.messageRefs.size());
        }
        this.clearAllScheduledDeliveries();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List undelivered(Filter filter) {
        ArrayList<MessageReference> undelivered = new ArrayList<MessageReference>();
        Object object = this.lock;
        synchronized (object) {
            for (MessageReference r : this.messageRefs.getAll()) {
                if (filter == null || filter.accept(r.getMessage())) {
                    undelivered.add(r);
                    continue;
                }
                if (!this.trace) continue;
                log.trace(this + ": " + r + " NOT accepted by filter so won't add to list");
            }
        }
        if (this.trace) {
            log.trace(this + ": undelivered() returns a list of " + undelivered.size() + " undelivered memory messages");
        }
        return undelivered;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getMessageCount() {
        Object object = this.lock;
        synchronized (object) {
            if (this.trace) {
                log.trace("Getting message count mr: " + this.messageRefs.size() + " dc " + this.getDeliveringCount() + " sc " + this.getScheduledCount());
            }
            return this.messageRefs.size() + this.getDeliveringCount() + this.getScheduledCount();
        }
    }

    public int getDeliveringCount() {
        return this.deliveringCount.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getScheduledCount() {
        Set set = this.scheduledDeliveries;
        synchronized (set) {
            return this.scheduledDeliveries.size();
        }
    }

    public void activate() {
        this.active = true;
    }

    public void deactivate() {
        this.active = false;
    }

    public boolean isActive() {
        return this.active;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getMaxSize() {
        Object object = this.lock;
        synchronized (object) {
            return this.maxSize;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMaxSize(int newSize) {
        Object object = this.lock;
        synchronized (object) {
            int count = this.getMessageCount();
            if (newSize != -1 && count > newSize) {
                log.warn("Cannot set maxSize to " + newSize + " since there are already " + count + " refs");
            } else {
                this.maxSize = newSize;
            }
        }
    }

    public int getMessagesAdded() {
        return this.messagesAdded.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int memoryRefCount() {
        Object object = this.lock;
        synchronized (object) {
            return this.messageRefs.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void clearAllScheduledDeliveries() {
        Set set = this.scheduledDeliveries;
        synchronized (set) {
            HashSet clone = new HashSet(this.scheduledDeliveries);
            for (Timeout timeout : clone) {
                timeout.cancel();
            }
            this.scheduledDeliveries.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void cancelInternal(MessageReference ref) throws Exception {
        if (this.trace) {
            log.trace(this + " cancelling " + ref + " in memory");
        }
        Object object = this.lock;
        synchronized (object) {
            this.messageRefs.addFirst(ref, ref.getMessage().getPriority());
        }
        if (this.trace) {
            log.trace(this + " added " + ref + " back into state");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void deliverInternal() {
        block19: {
            if (this.trace) {
                log.trace(this + " was prompted delivery");
            }
            try {
                ListIterator iter = null;
                MessageReference ref = null;
                if (!this.getReceiversReady()) {
                    if (this.trace) {
                        log.trace(this + " receivers not ready so not delivering");
                    }
                    return;
                }
                while ((ref = this.nextReference(iter)) != null) {
                    Delivery del;
                    if (this.trace) {
                        log.trace(this + " pushing " + ref);
                    }
                    this.setReceiversReady((del = this.distributor.handle(this, ref, null)) != null);
                    if (del == null) {
                        if (this.trace) {
                            log.trace(this + " got no delivery for " + ref + " so no receiver got the message. Stopping delivery.");
                        }
                        break block19;
                    }
                    if (!del.isSelectorAccepted()) {
                        if (iter != null) continue;
                        iter = this.messageRefs.iterator();
                        iter.next();
                        continue;
                    }
                    if (this.trace) {
                        log.trace(this + ": " + del + " returned for message " + ref);
                    }
                    Object object = this.lock;
                    synchronized (object) {
                        if (iter == null) {
                            if (this.trace) {
                                log.trace(this + " removing first ref in memory");
                            }
                            this.removeFirstInMemory();
                        } else {
                            if (this.trace) {
                                log.trace(this + " removed current message from iterator");
                            }
                            iter.remove();
                        }
                    }
                    this.deliveringCount.increment();
                }
                if (this.trace) {
                    log.trace(this + " no more refs to deliver ");
                }
            }
            catch (Throwable t) {
                log.error(this + " Failed to deliver", t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean deliverScheduled(MessageReference ref) {
        try {
            Object object = this.lock;
            synchronized (object) {
                Delivery del;
                if (this.trace) {
                    log.trace(this + " pushing " + ref);
                }
                this.setReceiversReady((del = this.distributor.handle(this, ref, null)) != null);
                if (del == null) {
                    if (this.trace) {
                        log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete");
                    }
                    return false;
                }
                if (del.isSelectorAccepted()) {
                    if (this.trace) {
                        log.trace(this + ": " + del + " returned for message:" + ref);
                    }
                    this.deliveringCount.increment();
                    return true;
                }
            }
        }
        catch (Throwable t) {
            log.error(this + " Failed to deliver", t);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Delivery handleInternal(DeliveryObserver sender, MessageReference ref, Transaction tx, boolean persist) {
        if (ref == null) {
            return null;
        }
        if (this.trace) {
            log.trace(this + " handles " + ref + (tx == null ? " non-transactionally" : " in transaction: " + tx));
        }
        if (this.maxSize != -1 && this.getMessageCount() >= this.maxSize) {
            log.warn(this + " has reached maximum size, " + ref + " will be dropped");
            return null;
        }
        ref = ref.copy();
        try {
            if (tx == null) {
                if (persist && ref.getMessage().isReliable() && this.recoverable) {
                    if (this.trace) {
                        log.trace(this + " adding " + ref + " to database non-transactionally");
                    }
                    this.pm.addReference(this.channelID, ref, null);
                }
                if (!this.checkAndSchedule(ref)) {
                    Object object = this.lock;
                    synchronized (object) {
                        this.addReferenceInMemory(ref);
                        this.deliverInternal();
                    }
                }
            } else {
                if (this.trace) {
                    log.trace(this + " adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx));
                }
                this.getCallback(tx).addRef(ref);
                if (this.trace) {
                    log.trace(this + " added transactionally " + ref + " in memory");
                }
                if (persist && ref.getMessage().isReliable() && this.recoverable) {
                    if (this.trace) {
                        log.trace(this + " adding " + ref + (tx == null ? " to database non-transactionally" : " in transaction: " + tx));
                    }
                    this.pm.addReference(this.channelID, ref, tx);
                }
            }
            this.messagesAdded.increment();
        }
        catch (Throwable t) {
            log.error("Failed to handle message", t);
            return null;
        }
        return new SimpleDelivery(this, ref, true, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean checkAndSchedule(MessageReference ref) {
        if (ref.getScheduledDeliveryTime() > System.currentTimeMillis()) {
            if (this.trace) {
                log.trace("Scheduling delivery for " + ref + " to occur at " + ref.getScheduledDeliveryTime());
            }
            Set set = this.scheduledDeliveries;
            synchronized (set) {
                Timeout timeout = MessagingTimeoutFactory.instance.getFactory().schedule(ref.getScheduledDeliveryTime(), new DeliverRefTimeoutTarget(ref));
                this.scheduledDeliveries.add(timeout);
            }
            return true;
        }
        return false;
    }

    protected void acknowledgeInternal(Delivery d, Transaction tx, boolean persist) throws Exception {
        if (tx == null) {
            if (persist && this.recoverable && d.getReference().getMessage().isReliable()) {
                this.pm.removeReference(this.channelID, d.getReference(), null);
            }
            if (!d.isRecovered()) {
                this.deliveringCount.decrement();
            }
        } else {
            this.getCallback(tx).addDelivery(d);
            if (this.trace) {
                log.trace(this + " added " + d + " to memory on transaction " + tx);
            }
            if (this.recoverable && d.getReference().getMessage().isReliable()) {
                this.pm.removeReference(this.channelID, d.getReference(), tx);
            }
        }
    }

    protected InMemoryCallback getCallback(Transaction tx) {
        InMemoryCallback callback = (InMemoryCallback)tx.getCallback(this);
        if (callback == null) {
            callback = new InMemoryCallback();
            tx.addCallback(callback, this);
        }
        return callback;
    }

    protected MessageReference removeFirstInMemory() throws Exception {
        MessageReference result = (MessageReference)this.messageRefs.removeFirst();
        return result;
    }

    protected void addReferenceInMemory(MessageReference ref) throws Exception {
        this.messageRefs.addLast(ref, ref.getMessage().getPriority());
        if (this.trace) {
            log.trace(this + " added " + ref + " non-transactionally in memory");
        }
    }

    protected boolean getReceiversReady() {
        return this.receiversReady;
    }

    protected void setReceiversReady(boolean receiversReady) {
        this.receiversReady = receiversReady;
    }

    private MessageReference nextReference(ListIterator iter) throws Throwable {
        MessageReference ref = iter == null ? (MessageReference)this.messageRefs.peekFirst() : (iter.hasNext() ? (MessageReference)iter.next() : null);
        return ref;
    }

    protected void processMessageBeforeStorage(MessageReference reference) {
    }

    protected void checkClosed() {
        if (this.distributor == null) {
            throw new IllegalStateException(this + " closed");
        }
    }

    private class DeliverRefTimeoutTarget
    implements TimeoutTarget {
        private MessageReference ref;

        public DeliverRefTimeoutTarget(MessageReference ref) {
            this.ref = ref;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void timedOut(Timeout timeout) {
            if (ChannelSupport.this.trace) {
                log.trace("Scheduled delivery timeout " + this.ref);
            }
            Set set = ChannelSupport.this.scheduledDeliveries;
            synchronized (set) {
                boolean removed = ChannelSupport.this.scheduledDeliveries.remove(timeout);
                if (!removed) {
                    throw new IllegalStateException("Failed to remove timeout " + timeout);
                }
            }
            this.ref.setScheduledDeliveryTime(0L);
            boolean delivered = false;
            if (ChannelSupport.this.distributor.getNumberOfReceivers() > 0) {
                delivered = ChannelSupport.this.deliverScheduled(this.ref);
            }
            if (!delivered) {
                try {
                    ChannelSupport.this.cancelInternal(this.ref);
                }
                catch (Exception e) {
                    log.error("Failed to cancel", e);
                }
            } else if (ChannelSupport.this.trace) {
                log.trace("Delivered scheduled delivery at " + System.currentTimeMillis() + " for " + this.ref);
            }
        }
    }

    private class InMemoryCallback
    implements TxCallback {
        private List refsToAdd = new ArrayList();
        private List deliveriesToRemove = new ArrayList();

        private InMemoryCallback() {
        }

        private void addRef(MessageReference ref) {
            this.refsToAdd.add(ref);
        }

        private void addDelivery(Delivery del) {
            this.deliveriesToRemove.add(del);
        }

        public void beforePrepare() {
        }

        public void beforeCommit(boolean onePhase) {
        }

        public void beforeRollback(boolean onePhase) {
        }

        public void afterPrepare() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void afterCommit(boolean onePhase) throws Exception {
            block16: {
                try {
                    boolean promptDelivery = false;
                    for (MessageReference ref : this.refsToAdd) {
                        if (ChannelSupport.this.checkAndSchedule(ref)) {
                            if (!ChannelSupport.this.trace) continue;
                            log.trace(this + ": scheduled " + ref);
                            continue;
                        }
                        if (ChannelSupport.this.trace) {
                            log.trace(this + ": adding " + ref + " to memory");
                        }
                        try {
                            Object object = ChannelSupport.this.lock;
                            synchronized (object) {
                                ChannelSupport.this.addReferenceInMemory(ref);
                            }
                        }
                        catch (Throwable t) {
                            throw new TransactionException("Failed to add reference", t);
                        }
                        promptDelivery = true;
                    }
                    for (Delivery del : this.deliveriesToRemove) {
                        if (ChannelSupport.this.trace) {
                            log.trace(this + " removing " + del + " after commit");
                        }
                        if (del.isRecovered()) continue;
                        ChannelSupport.this.deliveringCount.decrement();
                    }
                    if (!promptDelivery) break block16;
                    Object object = ChannelSupport.this.lock;
                    synchronized (object) {
                        ChannelSupport.this.deliverInternal();
                    }
                }
                catch (Throwable t) {
                    log.error("failed to commit", t);
                    throw new Exception("Failed to commit", t);
                }
            }
        }

        public void afterRollback(boolean onePhase) throws Exception {
        }

        public String toString() {
            return ChannelSupport.this + ".InMemoryCallback[" + Integer.toHexString(this.hashCode()) + "]";
        }
    }
}

