001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import org.apache.activemq.filter.BooleanExpression; 020import org.apache.activemq.filter.MessageEvaluationContext; 021import org.apache.activemq.util.JMSExceptionSupport; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025import javax.jms.JMSException; 026import java.io.IOException; 027import java.util.Arrays; 028 029/** 030 * @openwire:marshaller code="91" 031 * 032 */ 033public class NetworkBridgeFilter implements DataStructure, BooleanExpression { 034 035 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER; 036 static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class); 037 038 protected BrokerId networkBrokerId; 039 protected int messageTTL; 040 protected int consumerTTL; 041 transient ConsumerInfo consumerInfo; 042 043 public NetworkBridgeFilter() { 044 } 045 046 public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int messageTTL, int consumerTTL) { 047 this.networkBrokerId = networkBrokerId; 048 this.messageTTL = messageTTL; 049 this.consumerTTL = consumerTTL; 050 this.consumerInfo = consumerInfo; 051 } 052 053 public byte getDataStructureType() { 054 return DATA_STRUCTURE_TYPE; 055 } 056 057 public boolean isMarshallAware() { 058 return false; 059 } 060 061 public boolean matches(MessageEvaluationContext mec) throws JMSException { 062 try { 063 // for Queues - the message can be acknowledged and dropped whilst 064 // still 065 // in the dispatch loop 066 // so need to get the reference to it 067 Message message = mec.getMessage(); 068 return message != null && matchesForwardingFilter(message, mec); 069 } catch (IOException e) { 070 throw JMSExceptionSupport.create(e); 071 } 072 } 073 074 public Object evaluate(MessageEvaluationContext message) throws JMSException { 075 return matches(message) ? Boolean.TRUE : Boolean.FALSE; 076 } 077 078 protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) { 079 080 if (contains(message.getBrokerPath(), networkBrokerId)) { 081 if (LOG.isTraceEnabled()) { 082 LOG.trace("Message all ready routed once through target broker (" 083 + networkBrokerId + "), path: " 084 + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message); 085 } 086 return false; 087 } 088 089 int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length; 090 091 if (messageTTL > -1 && hops >= messageTTL) { 092 if (LOG.isTraceEnabled()) { 093 LOG.trace("Message restricted to " + messageTTL + " network hops ignoring: " + message); 094 } 095 return false; 096 } 097 098 if (message.isAdvisory()) { 099 if (consumerInfo != null && consumerInfo.isNetworkSubscription()) { 100 // they will be interpreted by the bridge leading to dup commands 101 if (LOG.isTraceEnabled()) { 102 LOG.trace("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message); 103 } 104 return false; 105 } else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) { 106 ConsumerInfo info = (ConsumerInfo)message.getDataStructure(); 107 hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length; 108 if (consumerTTL > -1 && hops >= consumerTTL) { 109 if (LOG.isTraceEnabled()) { 110 LOG.trace("ConsumerInfo advisory restricted to " + consumerTTL + " network hops ignoring: " + message); 111 } 112 return false; 113 } 114 115 if (contains(info.getBrokerPath(), networkBrokerId)) { 116 LOG.trace("ConsumerInfo advisory all ready routed once through target broker (" 117 + networkBrokerId + "), path: " 118 + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message); 119 return false; 120 } 121 } 122 } 123 return true; 124 } 125 126 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 127 if (brokerPath != null && brokerId != null) { 128 for (int i = 0; i < brokerPath.length; i++) { 129 if (brokerId.equals(brokerPath[i])) { 130 return true; 131 } 132 } 133 } 134 return false; 135 } 136 137 // keep for backward compat with older 138 // wire formats 139 public int getNetworkTTL() { 140 return messageTTL; 141 } 142 143 public void setNetworkTTL(int networkTTL) { 144 messageTTL = networkTTL; 145 consumerTTL = networkTTL; 146 } 147 148 /** 149 * @openwire:property version=1 cache=true 150 */ 151 public BrokerId getNetworkBrokerId() { 152 return networkBrokerId; 153 } 154 155 public void setNetworkBrokerId(BrokerId remoteBrokerPath) { 156 this.networkBrokerId = remoteBrokerPath; 157 } 158 159 public void setMessageTTL(int messageTTL) { 160 this.messageTTL = messageTTL; 161 } 162 163 /** 164 * @openwire:property version=10 165 */ 166 public int getMessageTTL() { 167 return this.messageTTL; 168 } 169 170 public void setConsumerTTL(int consumerTTL) { 171 this.consumerTTL = consumerTTL; 172 } 173 174 /** 175 * @openwire:property version=10 176 */ 177 public int getConsumerTTL() { 178 return this.consumerTTL; 179 } 180}