001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.thread; 018 019import java.util.concurrent.Executor; 020import java.util.concurrent.ExecutorService; 021import java.util.concurrent.RejectedExecutionHandler; 022import java.util.concurrent.SynchronousQueue; 023import java.util.concurrent.ThreadFactory; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028 029import org.apache.activemq.util.ThreadPoolUtils; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Manages the thread pool for long running tasks. Long running tasks are not 035 * always active but when they are active, they may need a few iterations of 036 * processing for them to become idle. The manager ensures that each task is 037 * processes but that no one task overtakes the system. This is kinda like 038 * cooperative multitasking. 039 * 040 * @org.apache.xbean.XBean 041 */ 042public class TaskRunnerFactory implements Executor { 043 044 private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerFactory.class); 045 private ExecutorService executor; 046 private int maxIterationsPerRun; 047 private String name; 048 private int priority; 049 private boolean daemon; 050 private final AtomicLong id = new AtomicLong(0); 051 private boolean dedicatedTaskRunner; 052 private long shutdownAwaitTermination = 30000; 053 private final AtomicBoolean initDone = new AtomicBoolean(false); 054 private int maxThreadPoolSize = Integer.MAX_VALUE; 055 private RejectedExecutionHandler rejectedTaskHandler = null; 056 private ClassLoader threadClassLoader; 057 058 public TaskRunnerFactory() { 059 this("ActiveMQ Task"); 060 } 061 062 public TaskRunnerFactory(String name) { 063 this(name, Thread.NORM_PRIORITY, true, 1000); 064 } 065 066 private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { 067 this(name,priority,daemon,maxIterationsPerRun,false); 068 } 069 070 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) { 071 this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, Integer.MAX_VALUE); 072 } 073 074 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) { 075 this.name = name; 076 this.priority = priority; 077 this.daemon = daemon; 078 this.maxIterationsPerRun = maxIterationsPerRun; 079 this.dedicatedTaskRunner = dedicatedTaskRunner; 080 this.maxThreadPoolSize = maxThreadPoolSize; 081 } 082 083 public void init() { 084 if (initDone.compareAndSet(false, true)) { 085 // If your OS/JVM combination has a good thread model, you may want to 086 // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead. 087 if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) { 088 executor = null; 089 } else if (executor == null) { 090 executor = createDefaultExecutor(); 091 } 092 LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executor); 093 } 094 } 095 096 /** 097 * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively. 098 * 099 * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService) 100 */ 101 public void shutdown() { 102 if (executor != null) { 103 ThreadPoolUtils.shutdown(executor); 104 executor = null; 105 } 106 initDone.set(false); 107 } 108 109 /** 110 * Performs a shutdown now (aggressively) on the thread pool. 111 * 112 * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService) 113 */ 114 public void shutdownNow() { 115 if (executor != null) { 116 ThreadPoolUtils.shutdownNow(executor); 117 executor = null; 118 } 119 initDone.set(false); 120 } 121 122 /** 123 * Performs a graceful shutdown. 124 * 125 * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService) 126 */ 127 public void shutdownGraceful() { 128 if (executor != null) { 129 ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination); 130 executor = null; 131 } 132 initDone.set(false); 133 } 134 135 public TaskRunner createTaskRunner(Task task, String name) { 136 init(); 137 if (executor != null) { 138 return new PooledTaskRunner(executor, task, maxIterationsPerRun); 139 } else { 140 return new DedicatedTaskRunner(task, name, priority, daemon); 141 } 142 } 143 144 @Override 145 public void execute(Runnable runnable) { 146 execute(runnable, name); 147 } 148 149 public void execute(Runnable runnable, String name) { 150 init(); 151 LOG.trace("Execute[{}] runnable: {}", name, runnable); 152 if (executor != null) { 153 executor.execute(runnable); 154 } else { 155 doExecuteNewThread(runnable, name); 156 } 157 } 158 159 private void doExecuteNewThread(Runnable runnable, String name) { 160 String threadName = name + "-" + id.incrementAndGet(); 161 Thread thread = new Thread(runnable, threadName); 162 thread.setDaemon(daemon); 163 164 LOG.trace("Created and running thread[{}]: {}", threadName, thread); 165 thread.start(); 166 } 167 168 protected ExecutorService createDefaultExecutor() { 169 ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { 170 @Override 171 public Thread newThread(Runnable runnable) { 172 String threadName = name + "-" + id.incrementAndGet(); 173 Thread thread = new Thread(runnable, threadName); 174 thread.setDaemon(daemon); 175 thread.setPriority(priority); 176 if (threadClassLoader != null) { 177 thread.setContextClassLoader(threadClassLoader); 178 } 179 180 LOG.trace("Created thread[{}]: {}", threadName, thread); 181 return thread; 182 } 183 }); 184 if (rejectedTaskHandler != null) { 185 rc.setRejectedExecutionHandler(rejectedTaskHandler); 186 } 187 return rc; 188 } 189 190 public ExecutorService getExecutor() { 191 return executor; 192 } 193 194 public void setExecutor(ExecutorService executor) { 195 this.executor = executor; 196 } 197 198 public int getMaxIterationsPerRun() { 199 return maxIterationsPerRun; 200 } 201 202 public void setMaxIterationsPerRun(int maxIterationsPerRun) { 203 this.maxIterationsPerRun = maxIterationsPerRun; 204 } 205 206 public String getName() { 207 return name; 208 } 209 210 public void setName(String name) { 211 this.name = name; 212 } 213 214 public int getPriority() { 215 return priority; 216 } 217 218 public void setPriority(int priority) { 219 this.priority = priority; 220 } 221 222 public boolean isDaemon() { 223 return daemon; 224 } 225 226 public void setDaemon(boolean daemon) { 227 this.daemon = daemon; 228 } 229 230 public boolean isDedicatedTaskRunner() { 231 return dedicatedTaskRunner; 232 } 233 234 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 235 this.dedicatedTaskRunner = dedicatedTaskRunner; 236 } 237 238 public int getMaxThreadPoolSize() { 239 return maxThreadPoolSize; 240 } 241 242 public void setMaxThreadPoolSize(int maxThreadPoolSize) { 243 this.maxThreadPoolSize = maxThreadPoolSize; 244 } 245 246 public void setThreadClassLoader(ClassLoader threadClassLoader) { 247 this.threadClassLoader = threadClassLoader; 248 } 249 250 public RejectedExecutionHandler getRejectedTaskHandler() { 251 return rejectedTaskHandler; 252 } 253 254 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { 255 this.rejectedTaskHandler = rejectedTaskHandler; 256 } 257 258 public long getShutdownAwaitTermination() { 259 return shutdownAwaitTermination; 260 } 261 262 public void setShutdownAwaitTermination(long shutdownAwaitTermination) { 263 this.shutdownAwaitTermination = shutdownAwaitTermination; 264 } 265 266 private static int getDefaultKeepAliveTime() { 267 return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.keepAliveTime", 30); 268 } 269}