001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import org.apache.activemq.filter.BooleanExpression;
020import org.apache.activemq.filter.MessageEvaluationContext;
021import org.apache.activemq.util.JMSExceptionSupport;
022import org.slf4j.Logger;
023import org.slf4j.LoggerFactory;
024
025import javax.jms.JMSException;
026import java.io.IOException;
027import java.util.Arrays;
028
029/**
030 * @openwire:marshaller code="91"
031 *
032 */
033public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
034
035    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
036    static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
037
038    protected BrokerId networkBrokerId;
039    protected int messageTTL;
040    protected int consumerTTL;
041    transient ConsumerInfo consumerInfo;
042
043    public NetworkBridgeFilter() {
044    }
045
046    public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int messageTTL, int consumerTTL) {
047        this.networkBrokerId = networkBrokerId;
048        this.messageTTL = messageTTL;
049        this.consumerTTL = consumerTTL;
050        this.consumerInfo = consumerInfo;
051    }
052
053    public byte getDataStructureType() {
054        return DATA_STRUCTURE_TYPE;
055    }
056
057    public boolean isMarshallAware() {
058        return false;
059    }
060
061    public boolean matches(MessageEvaluationContext mec) throws JMSException {
062        try {
063            // for Queues - the message can be acknowledged and dropped whilst
064            // still
065            // in the dispatch loop
066            // so need to get the reference to it
067            Message message = mec.getMessage();
068            return message != null && matchesForwardingFilter(message, mec);
069        } catch (IOException e) {
070            throw JMSExceptionSupport.create(e);
071        }
072    }
073
074    public Object evaluate(MessageEvaluationContext message) throws JMSException {
075        return matches(message) ? Boolean.TRUE : Boolean.FALSE;
076    }
077
078    protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) {
079
080        if (contains(message.getBrokerPath(), networkBrokerId)) {
081            if (LOG.isTraceEnabled()) {
082                LOG.trace("Message all ready routed once through target broker ("
083                        + networkBrokerId + "), path: "
084                        + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
085            }
086            return false;
087        }
088
089        int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
090
091        if (messageTTL > -1 && hops >= messageTTL) {
092            if (LOG.isTraceEnabled()) {
093                LOG.trace("Message restricted to " + messageTTL + " network hops ignoring: " + message);
094            }
095            return false;
096        }
097
098        if (message.isAdvisory()) {
099            if (consumerInfo != null && consumerInfo.isNetworkSubscription()) {
100                // they will be interpreted by the bridge leading to dup commands
101                if (LOG.isTraceEnabled()) {
102                    LOG.trace("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message);
103                }
104                return false;
105            } else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
106                ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
107                hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
108                if (consumerTTL > -1 && hops >= consumerTTL) {
109                    if (LOG.isTraceEnabled()) {
110                        LOG.trace("ConsumerInfo advisory restricted to " + consumerTTL + " network hops ignoring: " + message);
111                    }
112                    return false;
113                }
114
115                if (contains(info.getBrokerPath(), networkBrokerId)) {
116                    LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
117                            + networkBrokerId + "), path: "
118                            + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
119                    return false;
120                }
121            }
122        }
123        return true;
124    }
125
126    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
127        if (brokerPath != null && brokerId != null) {
128            for (int i = 0; i < brokerPath.length; i++) {
129                if (brokerId.equals(brokerPath[i])) {
130                    return true;
131                }
132            }
133        }
134        return false;
135    }
136
137    // keep for backward compat with older
138    // wire formats
139    public int getNetworkTTL() {
140        return messageTTL;
141    }
142
143    public void setNetworkTTL(int networkTTL) {
144        messageTTL = networkTTL;
145        consumerTTL = networkTTL;
146    }
147
148    /**
149     * @openwire:property version=1 cache=true
150     */
151    public BrokerId getNetworkBrokerId() {
152        return networkBrokerId;
153    }
154
155    public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
156        this.networkBrokerId = remoteBrokerPath;
157    }
158
159    public void setMessageTTL(int messageTTL) {
160        this.messageTTL = messageTTL;
161    }
162
163    /**
164     * @openwire:property version=10
165     */
166    public int getMessageTTL() {
167        return this.messageTTL;
168    }
169
170    public void setConsumerTTL(int consumerTTL) {
171        this.consumerTTL = consumerTTL;
172    }
173
174    /**
175     * @openwire:property version=10
176     */
177    public int getConsumerTTL() {
178        return this.consumerTTL;
179    }
180}