001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.util.Timer; 021import java.util.concurrent.RejectedExecutionException; 022import java.util.concurrent.SynchronousQueue; 023import java.util.concurrent.ThreadFactory; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicInteger; 028import java.util.concurrent.locks.ReentrantReadWriteLock; 029 030import org.apache.activemq.command.KeepAliveInfo; 031import org.apache.activemq.command.WireFormatInfo; 032import org.apache.activemq.thread.SchedulerTimerTask; 033import org.apache.activemq.util.ThreadPoolUtils; 034import org.apache.activemq.wireformat.WireFormat; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Used to make sure that commands are arriving periodically from the peer of 040 * the transport. 041 */ 042public abstract class AbstractInactivityMonitor extends TransportFilter { 043 044 private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class); 045 046 private static ThreadPoolExecutor ASYNC_TASKS; 047 private static int CHECKER_COUNTER; 048 private static long DEFAULT_CHECK_TIME_MILLS = 30000; 049 private static Timer READ_CHECK_TIMER; 050 private static Timer WRITE_CHECK_TIMER; 051 052 private final AtomicBoolean monitorStarted = new AtomicBoolean(false); 053 054 private final AtomicBoolean commandSent = new AtomicBoolean(false); 055 private final AtomicBoolean inSend = new AtomicBoolean(false); 056 private final AtomicBoolean failed = new AtomicBoolean(false); 057 058 private final AtomicBoolean commandReceived = new AtomicBoolean(true); 059 private final AtomicBoolean inReceive = new AtomicBoolean(false); 060 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); 061 062 private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock(); 063 064 private SchedulerTimerTask writeCheckerTask; 065 private SchedulerTimerTask readCheckerTask; 066 067 private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; 068 private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS; 069 private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; 070 private boolean useKeepAlive = true; 071 private boolean keepAliveResponseRequired; 072 073 protected WireFormat wireFormat; 074 075 private final Runnable readChecker = new Runnable() { 076 long lastRunTime; 077 078 @Override 079 public void run() { 080 long now = System.currentTimeMillis(); 081 long elapsed = (now - lastRunTime); 082 083 if (lastRunTime != 0) { 084 LOG.debug("{}ms elapsed since last read check.", elapsed); 085 } 086 087 // Perhaps the timer executed a read check late.. and then executes 088 // the next read check on time which causes the time elapsed between 089 // read checks to be small.. 090 091 // If less than 90% of the read check Time elapsed then abort this 092 // read check. 093 if (!allowReadCheck(elapsed)) { 094 LOG.debug("Aborting read check...Not enough time elapsed since last read check."); 095 return; 096 } 097 098 lastRunTime = now; 099 readCheck(); 100 } 101 102 @Override 103 public String toString() { 104 return "ReadChecker"; 105 } 106 }; 107 108 private boolean allowReadCheck(long elapsed) { 109 return elapsed > (readCheckTime * 9 / 10); 110 } 111 112 private final Runnable writeChecker = new Runnable() { 113 long lastRunTime; 114 115 @Override 116 public void run() { 117 long now = System.currentTimeMillis(); 118 if (lastRunTime != 0) { 119 LOG.debug("{}: {}ms elapsed since last write check.", this, (now - lastRunTime)); 120 } 121 lastRunTime = now; 122 writeCheck(); 123 } 124 125 @Override 126 public String toString() { 127 return "WriteChecker"; 128 } 129 }; 130 131 public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) { 132 super(next); 133 this.wireFormat = wireFormat; 134 } 135 136 @Override 137 public void start() throws Exception { 138 next.start(); 139 startMonitorThreads(); 140 } 141 142 @Override 143 public void stop() throws Exception { 144 stopMonitorThreads(); 145 next.stop(); 146 } 147 148 final void writeCheck() { 149 if (inSend.get()) { 150 LOG.trace("Send in progress. Skipping write check."); 151 return; 152 } 153 154 if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) { 155 LOG.trace("{} no message sent since last write check, sending a KeepAliveInfo", this); 156 157 try { 158 ASYNC_TASKS.execute(new Runnable() { 159 @Override 160 public void run() { 161 LOG.debug("Running {}", this); 162 if (monitorStarted.get()) { 163 try { 164 // If we can't get the lock it means another 165 // write beat us into the 166 // send and we don't need to heart beat now. 167 if (sendLock.writeLock().tryLock()) { 168 KeepAliveInfo info = new KeepAliveInfo(); 169 info.setResponseRequired(keepAliveResponseRequired); 170 doOnewaySend(info); 171 } 172 } catch (IOException e) { 173 onException(e); 174 } finally { 175 if (sendLock.writeLock().isHeldByCurrentThread()) { 176 sendLock.writeLock().unlock(); 177 } 178 } 179 } 180 } 181 182 @Override 183 public String toString() { 184 return "WriteCheck[" + getRemoteAddress() + "]"; 185 }; 186 }); 187 } catch (RejectedExecutionException ex) { 188 if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) { 189 LOG.error("Async write check was rejected from the executor: ", ex); 190 throw ex; 191 } 192 } 193 } else { 194 LOG.trace("{} message sent since last write check, resetting flag.", this); 195 } 196 197 commandSent.set(false); 198 } 199 200 final void readCheck() { 201 int currentCounter = next.getReceiveCounter(); 202 int previousCounter = lastReceiveCounter.getAndSet(currentCounter); 203 if (inReceive.get() || currentCounter != previousCounter) { 204 LOG.trace("A receive is in progress, skipping read check."); 205 return; 206 } 207 if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) { 208 LOG.debug("No message received since last read check for {}. Throwing InactivityIOException.", this); 209 210 try { 211 ASYNC_TASKS.execute(new Runnable() { 212 @Override 213 public void run() { 214 LOG.debug("Running {}", this); 215 onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress())); 216 } 217 218 @Override 219 public String toString() { 220 return "ReadCheck[" + getRemoteAddress() + "]"; 221 }; 222 }); 223 } catch (RejectedExecutionException ex) { 224 if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) { 225 LOG.error("Async read check was rejected from the executor: ", ex); 226 throw ex; 227 } 228 } 229 } else { 230 if (LOG.isTraceEnabled()) { 231 LOG.trace("Message received since last read check, resetting flag: "); 232 } 233 } 234 commandReceived.set(false); 235 } 236 237 protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException; 238 239 protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException; 240 241 @Override 242 public void onCommand(Object command) { 243 commandReceived.set(true); 244 inReceive.set(true); 245 try { 246 if (command.getClass() == KeepAliveInfo.class) { 247 KeepAliveInfo info = (KeepAliveInfo) command; 248 if (info.isResponseRequired()) { 249 sendLock.readLock().lock(); 250 try { 251 info.setResponseRequired(false); 252 oneway(info); 253 } catch (IOException e) { 254 onException(e); 255 } finally { 256 sendLock.readLock().unlock(); 257 } 258 } 259 } else { 260 if (command.getClass() == WireFormatInfo.class) { 261 synchronized (this) { 262 try { 263 processInboundWireFormatInfo((WireFormatInfo) command); 264 } catch (IOException e) { 265 onException(e); 266 } 267 } 268 } 269 270 transportListener.onCommand(command); 271 } 272 } finally { 273 inReceive.set(false); 274 } 275 } 276 277 @Override 278 public void oneway(Object o) throws IOException { 279 // To prevent the inactivity monitor from sending a message while we 280 // are performing a send we take a read lock. The inactivity monitor 281 // sends its Heart-beat commands under a write lock. This means that 282 // the MutexTransport is still responsible for synchronizing sends 283 this.sendLock.readLock().lock(); 284 inSend.set(true); 285 try { 286 doOnewaySend(o); 287 } finally { 288 commandSent.set(true); 289 inSend.set(false); 290 this.sendLock.readLock().unlock(); 291 } 292 } 293 294 // Must be called under lock, either read or write on sendLock. 295 private void doOnewaySend(Object command) throws IOException { 296 if (failed.get()) { 297 throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress()); 298 } 299 if (command.getClass() == WireFormatInfo.class) { 300 synchronized (this) { 301 processOutboundWireFormatInfo((WireFormatInfo) command); 302 } 303 } 304 next.oneway(command); 305 } 306 307 @Override 308 public void onException(IOException error) { 309 if (failed.compareAndSet(false, true)) { 310 stopMonitorThreads(); 311 if (sendLock.writeLock().isHeldByCurrentThread()) { 312 sendLock.writeLock().unlock(); 313 } 314 transportListener.onException(error); 315 } 316 } 317 318 public void setUseKeepAlive(boolean val) { 319 useKeepAlive = val; 320 } 321 322 public long getReadCheckTime() { 323 return readCheckTime; 324 } 325 326 public void setReadCheckTime(long readCheckTime) { 327 this.readCheckTime = readCheckTime; 328 } 329 330 public long getWriteCheckTime() { 331 return writeCheckTime; 332 } 333 334 public void setWriteCheckTime(long writeCheckTime) { 335 this.writeCheckTime = writeCheckTime; 336 } 337 338 public long getInitialDelayTime() { 339 return initialDelayTime; 340 } 341 342 public void setInitialDelayTime(long initialDelayTime) { 343 this.initialDelayTime = initialDelayTime; 344 } 345 346 public boolean isKeepAliveResponseRequired() { 347 return this.keepAliveResponseRequired; 348 } 349 350 public void setKeepAliveResponseRequired(boolean value) { 351 this.keepAliveResponseRequired = value; 352 } 353 354 public boolean isMonitorStarted() { 355 return this.monitorStarted.get(); 356 } 357 358 protected synchronized void startMonitorThreads() throws IOException { 359 if (monitorStarted.get()) { 360 return; 361 } 362 363 if (!configuredOk()) { 364 return; 365 } 366 367 if (readCheckTime > 0) { 368 readCheckerTask = new SchedulerTimerTask(readChecker); 369 } 370 371 if (writeCheckTime > 0) { 372 writeCheckerTask = new SchedulerTimerTask(writeChecker); 373 } 374 375 if (writeCheckTime > 0 || readCheckTime > 0) { 376 monitorStarted.set(true); 377 synchronized (AbstractInactivityMonitor.class) { 378 if (CHECKER_COUNTER == 0) { 379 ASYNC_TASKS = createExecutor(); 380 READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true); 381 WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer", true); 382 } 383 CHECKER_COUNTER++; 384 if (readCheckTime > 0) { 385 READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime); 386 } 387 if (writeCheckTime > 0) { 388 WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime); 389 } 390 } 391 } 392 } 393 394 abstract protected boolean configuredOk() throws IOException; 395 396 protected synchronized void stopMonitorThreads() { 397 if (monitorStarted.compareAndSet(true, false)) { 398 if (readCheckerTask != null) { 399 readCheckerTask.cancel(); 400 } 401 if (writeCheckerTask != null) { 402 writeCheckerTask.cancel(); 403 } 404 synchronized (AbstractInactivityMonitor.class) { 405 WRITE_CHECK_TIMER.purge(); 406 READ_CHECK_TIMER.purge(); 407 CHECKER_COUNTER--; 408 if (CHECKER_COUNTER == 0) { 409 WRITE_CHECK_TIMER.cancel(); 410 READ_CHECK_TIMER.cancel(); 411 WRITE_CHECK_TIMER = null; 412 READ_CHECK_TIMER = null; 413 ThreadPoolUtils.shutdown(ASYNC_TASKS); 414 } 415 } 416 } 417 } 418 419 private final ThreadFactory factory = new ThreadFactory() { 420 @Override 421 public Thread newThread(Runnable runnable) { 422 Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker"); 423 thread.setDaemon(true); 424 return thread; 425 } 426 }; 427 428 private ThreadPoolExecutor createExecutor() { 429 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory); 430 exec.allowCoreThreadTimeOut(true); 431 return exec; 432 } 433 434 private static int getDefaultKeepAliveTime() { 435 return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", 30); 436 } 437}