001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport;
018
019import java.io.IOException;
020import java.util.Timer;
021import java.util.concurrent.RejectedExecutionException;
022import java.util.concurrent.SynchronousQueue;
023import java.util.concurrent.ThreadFactory;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029
030import org.apache.activemq.command.KeepAliveInfo;
031import org.apache.activemq.command.WireFormatInfo;
032import org.apache.activemq.thread.SchedulerTimerTask;
033import org.apache.activemq.util.ThreadPoolUtils;
034import org.apache.activemq.wireformat.WireFormat;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Used to make sure that commands are arriving periodically from the peer of
040 * the transport.
041 */
042public abstract class AbstractInactivityMonitor extends TransportFilter {
043
044    private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class);
045
046    private static ThreadPoolExecutor ASYNC_TASKS;
047    private static int CHECKER_COUNTER;
048    private static long DEFAULT_CHECK_TIME_MILLS = 30000;
049    private static Timer READ_CHECK_TIMER;
050    private static Timer WRITE_CHECK_TIMER;
051
052    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
053
054    private final AtomicBoolean commandSent = new AtomicBoolean(false);
055    private final AtomicBoolean inSend = new AtomicBoolean(false);
056    private final AtomicBoolean failed = new AtomicBoolean(false);
057
058    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
059    private final AtomicBoolean inReceive = new AtomicBoolean(false);
060    private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
061
062    private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
063
064    private SchedulerTimerTask writeCheckerTask;
065    private SchedulerTimerTask readCheckerTask;
066
067    private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
068    private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
069    private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
070    private boolean useKeepAlive = true;
071    private boolean keepAliveResponseRequired;
072
073    protected WireFormat wireFormat;
074
075    private final Runnable readChecker = new Runnable() {
076        long lastRunTime;
077
078        @Override
079        public void run() {
080            long now = System.currentTimeMillis();
081            long elapsed = (now - lastRunTime);
082
083            if (lastRunTime != 0) {
084                LOG.debug("{}ms elapsed since last read check.", elapsed);
085            }
086
087            // Perhaps the timer executed a read check late.. and then executes
088            // the next read check on time which causes the time elapsed between
089            // read checks to be small..
090
091            // If less than 90% of the read check Time elapsed then abort this
092            // read check.
093            if (!allowReadCheck(elapsed)) {
094                LOG.debug("Aborting read check...Not enough time elapsed since last read check.");
095                return;
096            }
097
098            lastRunTime = now;
099            readCheck();
100        }
101
102        @Override
103        public String toString() {
104            return "ReadChecker";
105        }
106    };
107
108    private boolean allowReadCheck(long elapsed) {
109        return elapsed > (readCheckTime * 9 / 10);
110    }
111
112    private final Runnable writeChecker = new Runnable() {
113        long lastRunTime;
114
115        @Override
116        public void run() {
117            long now = System.currentTimeMillis();
118            if (lastRunTime != 0) {
119                LOG.debug("{}: {}ms elapsed since last write check.", this, (now - lastRunTime));
120            }
121            lastRunTime = now;
122            writeCheck();
123        }
124
125        @Override
126        public String toString() {
127            return "WriteChecker";
128        }
129    };
130
131    public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
132        super(next);
133        this.wireFormat = wireFormat;
134    }
135
136    @Override
137    public void start() throws Exception {
138        next.start();
139        startMonitorThreads();
140    }
141
142    @Override
143    public void stop() throws Exception {
144        stopMonitorThreads();
145        next.stop();
146    }
147
148    final void writeCheck() {
149        if (inSend.get()) {
150            LOG.trace("Send in progress. Skipping write check.");
151            return;
152        }
153
154        if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
155            LOG.trace("{} no message sent since last write check, sending a KeepAliveInfo", this);
156
157            try {
158                ASYNC_TASKS.execute(new Runnable() {
159                    @Override
160                    public void run() {
161                        LOG.debug("Running {}", this);
162                        if (monitorStarted.get()) {
163                            try {
164                                // If we can't get the lock it means another
165                                // write beat us into the
166                                // send and we don't need to heart beat now.
167                                if (sendLock.writeLock().tryLock()) {
168                                    KeepAliveInfo info = new KeepAliveInfo();
169                                    info.setResponseRequired(keepAliveResponseRequired);
170                                    doOnewaySend(info);
171                                }
172                            } catch (IOException e) {
173                                onException(e);
174                            } finally {
175                                if (sendLock.writeLock().isHeldByCurrentThread()) {
176                                    sendLock.writeLock().unlock();
177                                }
178                            }
179                        }
180                    }
181
182                    @Override
183                    public String toString() {
184                        return "WriteCheck[" + getRemoteAddress() + "]";
185                    };
186                });
187            } catch (RejectedExecutionException ex) {
188                if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
189                    LOG.error("Async write check was rejected from the executor: ", ex);
190                    throw ex;
191                }
192            }
193        } else {
194            LOG.trace("{} message sent since last write check, resetting flag.", this);
195        }
196
197        commandSent.set(false);
198    }
199
200    final void readCheck() {
201        int currentCounter = next.getReceiveCounter();
202        int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
203        if (inReceive.get() || currentCounter != previousCounter) {
204            LOG.trace("A receive is in progress, skipping read check.");
205            return;
206        }
207        if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
208            LOG.debug("No message received since last read check for {}. Throwing InactivityIOException.", this);
209
210            try {
211                ASYNC_TASKS.execute(new Runnable() {
212                    @Override
213                    public void run() {
214                        LOG.debug("Running {}", this);
215                        onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
216                    }
217
218                    @Override
219                    public String toString() {
220                        return "ReadCheck[" + getRemoteAddress() + "]";
221                    };
222                });
223            } catch (RejectedExecutionException ex) {
224                if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
225                    LOG.error("Async read check was rejected from the executor: ", ex);
226                    throw ex;
227                }
228            }
229        } else {
230            if (LOG.isTraceEnabled()) {
231                LOG.trace("Message received since last read check, resetting flag: ");
232            }
233        }
234        commandReceived.set(false);
235    }
236
237    protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException;
238
239    protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException;
240
241    @Override
242    public void onCommand(Object command) {
243        commandReceived.set(true);
244        inReceive.set(true);
245        try {
246            if (command.getClass() == KeepAliveInfo.class) {
247                KeepAliveInfo info = (KeepAliveInfo) command;
248                if (info.isResponseRequired()) {
249                    sendLock.readLock().lock();
250                    try {
251                        info.setResponseRequired(false);
252                        oneway(info);
253                    } catch (IOException e) {
254                        onException(e);
255                    } finally {
256                        sendLock.readLock().unlock();
257                    }
258                }
259            } else {
260                if (command.getClass() == WireFormatInfo.class) {
261                    synchronized (this) {
262                        try {
263                            processInboundWireFormatInfo((WireFormatInfo) command);
264                        } catch (IOException e) {
265                            onException(e);
266                        }
267                    }
268                }
269
270                transportListener.onCommand(command);
271            }
272        } finally {
273            inReceive.set(false);
274        }
275    }
276
277    @Override
278    public void oneway(Object o) throws IOException {
279        // To prevent the inactivity monitor from sending a message while we
280        // are performing a send we take a read lock. The inactivity monitor
281        // sends its Heart-beat commands under a write lock. This means that
282        // the MutexTransport is still responsible for synchronizing sends
283        this.sendLock.readLock().lock();
284        inSend.set(true);
285        try {
286            doOnewaySend(o);
287        } finally {
288            commandSent.set(true);
289            inSend.set(false);
290            this.sendLock.readLock().unlock();
291        }
292    }
293
294    // Must be called under lock, either read or write on sendLock.
295    private void doOnewaySend(Object command) throws IOException {
296        if (failed.get()) {
297            throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
298        }
299        if (command.getClass() == WireFormatInfo.class) {
300            synchronized (this) {
301                processOutboundWireFormatInfo((WireFormatInfo) command);
302            }
303        }
304        next.oneway(command);
305    }
306
307    @Override
308    public void onException(IOException error) {
309        if (failed.compareAndSet(false, true)) {
310            stopMonitorThreads();
311            if (sendLock.writeLock().isHeldByCurrentThread()) {
312                sendLock.writeLock().unlock();
313            }
314            transportListener.onException(error);
315        }
316    }
317
318    public void setUseKeepAlive(boolean val) {
319        useKeepAlive = val;
320    }
321
322    public long getReadCheckTime() {
323        return readCheckTime;
324    }
325
326    public void setReadCheckTime(long readCheckTime) {
327        this.readCheckTime = readCheckTime;
328    }
329
330    public long getWriteCheckTime() {
331        return writeCheckTime;
332    }
333
334    public void setWriteCheckTime(long writeCheckTime) {
335        this.writeCheckTime = writeCheckTime;
336    }
337
338    public long getInitialDelayTime() {
339        return initialDelayTime;
340    }
341
342    public void setInitialDelayTime(long initialDelayTime) {
343        this.initialDelayTime = initialDelayTime;
344    }
345
346    public boolean isKeepAliveResponseRequired() {
347        return this.keepAliveResponseRequired;
348    }
349
350    public void setKeepAliveResponseRequired(boolean value) {
351        this.keepAliveResponseRequired = value;
352    }
353
354    public boolean isMonitorStarted() {
355        return this.monitorStarted.get();
356    }
357
358    protected synchronized void startMonitorThreads() throws IOException {
359        if (monitorStarted.get()) {
360            return;
361        }
362
363        if (!configuredOk()) {
364            return;
365        }
366
367        if (readCheckTime > 0) {
368            readCheckerTask = new SchedulerTimerTask(readChecker);
369        }
370
371        if (writeCheckTime > 0) {
372            writeCheckerTask = new SchedulerTimerTask(writeChecker);
373        }
374
375        if (writeCheckTime > 0 || readCheckTime > 0) {
376            monitorStarted.set(true);
377            synchronized (AbstractInactivityMonitor.class) {
378                if (CHECKER_COUNTER == 0) {
379                    ASYNC_TASKS = createExecutor();
380                    READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true);
381                    WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer", true);
382                }
383                CHECKER_COUNTER++;
384                if (readCheckTime > 0) {
385                    READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
386                }
387                if (writeCheckTime > 0) {
388                    WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
389                }
390            }
391        }
392    }
393
394    abstract protected boolean configuredOk() throws IOException;
395
396    protected synchronized void stopMonitorThreads() {
397        if (monitorStarted.compareAndSet(true, false)) {
398            if (readCheckerTask != null) {
399                readCheckerTask.cancel();
400            }
401            if (writeCheckerTask != null) {
402                writeCheckerTask.cancel();
403            }
404            synchronized (AbstractInactivityMonitor.class) {
405                WRITE_CHECK_TIMER.purge();
406                READ_CHECK_TIMER.purge();
407                CHECKER_COUNTER--;
408                if (CHECKER_COUNTER == 0) {
409                    WRITE_CHECK_TIMER.cancel();
410                    READ_CHECK_TIMER.cancel();
411                    WRITE_CHECK_TIMER = null;
412                    READ_CHECK_TIMER = null;
413                    ThreadPoolUtils.shutdown(ASYNC_TASKS);
414                }
415            }
416        }
417    }
418
419    private final ThreadFactory factory = new ThreadFactory() {
420        @Override
421        public Thread newThread(Runnable runnable) {
422            Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker");
423            thread.setDaemon(true);
424            return thread;
425        }
426    };
427
428    private ThreadPoolExecutor createExecutor() {
429        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
430        exec.allowCoreThreadTimeOut(true);
431        return exec;
432    }
433
434    private static int getDefaultKeepAliveTime() {
435        return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", 30);
436    }
437}