001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    package org.apache.geronimo.connector.outbound;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.Timer;
022    import java.util.TimerTask;
023    import java.util.concurrent.Semaphore;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.locks.ReadWriteLock;
026    import java.util.concurrent.locks.ReentrantReadWriteLock;
027    
028    import javax.resource.ResourceException;
029    import javax.resource.spi.ConnectionRequestInfo;
030    import javax.resource.spi.ManagedConnectionFactory;
031    import javax.security.auth.Subject;
032    
033    import org.apache.commons.logging.Log;
034    import org.apache.commons.logging.LogFactory;
035    
036    /**
037     * @version $Rev: 550546 $ $Date: 2007-06-25 12:52:11 -0400 (Mon, 25 Jun 2007) $
038     */
039    public abstract class AbstractSinglePoolConnectionInterceptor implements ConnectionInterceptor, PoolingAttributes {
040        protected static Log log = LogFactory.getLog(AbstractSinglePoolConnectionInterceptor.class.getName());
041        protected final ConnectionInterceptor next;
042        private final ReadWriteLock resizeLock = new ReentrantReadWriteLock();
043        protected Semaphore permits;
044        protected int blockingTimeoutMilliseconds;
045        protected int connectionCount = 0;
046        private long idleTimeoutMilliseconds;
047        private IdleReleaser idleReleaser;
048        protected Timer timer = PoolIdleReleaserTimer.getTimer();
049        protected int maxSize = 0;
050        protected int minSize = 0;
051        protected int shrinkLater = 0;
052        protected volatile boolean destroyed = false;
053    
054        public AbstractSinglePoolConnectionInterceptor(final ConnectionInterceptor next,
055                                                       int maxSize,
056                                                       int minSize,
057                                                       int blockingTimeoutMilliseconds,
058                                                       int idleTimeoutMinutes) {
059            this.next = next;
060            this.maxSize = maxSize;
061            this.minSize = minSize;
062            this.blockingTimeoutMilliseconds = blockingTimeoutMilliseconds;
063            setIdleTimeoutMinutes(idleTimeoutMinutes);
064            permits = new Semaphore(maxSize, true);
065        }
066    
067        public void getConnection(ConnectionInfo connectionInfo) throws ResourceException {
068            if (connectionInfo.getManagedConnectionInfo().getManagedConnection() != null) {
069                if (log.isTraceEnabled()) {
070                    log.trace("using already assigned connection " + connectionInfo.getConnectionHandle() + " for managed connection " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " to pool " + this);
071                }
072                return;
073            }
074            try {
075                resizeLock.readLock().lock();
076                try {
077                    if (permits.tryAcquire(blockingTimeoutMilliseconds, TimeUnit.MILLISECONDS)) {
078                        internalGetConnection(connectionInfo);
079                    } else {
080                        throw new ResourceException("No ManagedConnections available "
081                                + "within configured blocking timeout ( "
082                                + blockingTimeoutMilliseconds
083                                + " [ms] ) for pool " + this);
084    
085                    }
086                } finally {
087                    resizeLock.readLock().unlock();
088                }
089    
090            } catch (InterruptedException ie) {
091                throw new ResourceException("Interrupted while requesting permit.", ie);
092            } // end of try-catch
093        }
094    
095        protected abstract void internalGetConnection(ConnectionInfo connectionInfo) throws ResourceException;
096    
097        public void returnConnection(ConnectionInfo connectionInfo,
098                                     ConnectionReturnAction connectionReturnAction) {
099            if (log.isTraceEnabled()) {
100                log.trace("returning connection " + connectionInfo.getConnectionHandle() + " for MCI " + connectionInfo.getManagedConnectionInfo() + " and MC " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " to pool " + this);
101            }
102    
103            // not strictly synchronized with destroy(), but pooled operations in internalReturn() are...
104            if (destroyed) {
105                try {
106                    connectionInfo.getManagedConnectionInfo().getManagedConnection().destroy();
107                } catch (ResourceException re) {
108                    // empty
109                }
110                return;
111            }
112    
113            resizeLock.readLock().lock();
114            try {
115                ManagedConnectionInfo mci = connectionInfo.getManagedConnectionInfo();
116                if (connectionReturnAction == ConnectionReturnAction.RETURN_HANDLE && mci.hasConnectionHandles()) {
117                    if (log.isTraceEnabled()) {
118                        log.trace("Return request at pool with connection handles! " + connectionInfo.getConnectionHandle() + " for MCI " + connectionInfo.getManagedConnectionInfo() + " and MC " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " to pool " + this, new Exception("Stack trace"));
119                    }
120                    return;
121                }
122    
123                boolean wasInPool = internalReturn(connectionInfo, connectionReturnAction);
124    
125                if (!wasInPool) {
126                    permits.release();
127                }
128            } finally {
129                resizeLock.readLock().unlock();
130            }
131        }
132    
133        protected abstract boolean internalReturn(ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction);
134    
135        protected abstract void internalDestroy();
136    
137        // Cancel the IdleReleaser TimerTask (fixes memory leak) and clean up the pool
138        public void destroy() {
139            destroyed = true;
140            if (idleReleaser != null)
141                idleReleaser.cancel();
142            internalDestroy();
143            next.destroy();
144        }
145    
146        public int getPartitionCount() {
147            return 1;
148        }
149    
150        public abstract int getPartitionMaxSize();
151    
152        public void setPartitionMaxSize(int newMaxSize) throws InterruptedException {
153            if (newMaxSize <= 0) {
154                throw new IllegalArgumentException("Max size must be positive, not " + newMaxSize);
155            }
156            if (newMaxSize != getPartitionMaxSize()) {
157                resizeLock.writeLock().lock();
158                try {
159                    ResizeInfo resizeInfo = new ResizeInfo(this.minSize, permits.availablePermits(), connectionCount, newMaxSize);
160                    this.shrinkLater = resizeInfo.getShrinkLater();
161    
162                    permits = new Semaphore(newMaxSize, true);
163                    //pre-acquire permits for the existing checked out connections that will not be closed when they are returned.
164                    for (int i = 0; i < resizeInfo.getTransferCheckedOut(); i++) {
165                        permits.acquire();
166                    }
167                    //transfer connections we are going to keep
168                    transferConnections(newMaxSize, resizeInfo.getShrinkNow());
169                    this.minSize = resizeInfo.getNewMinSize();
170                } finally {
171                    resizeLock.writeLock().unlock();
172                }
173            }
174        }
175    
176    
177        static final class ResizeInfo {
178    
179            private final int newMinSize;
180            private final int shrinkNow;
181            private final int shrinkLater;
182            private final int transferCheckedOut;
183    
184            ResizeInfo(final int oldMinSize, final int oldPermitsAvailable, final int oldConnectionCount, final int newMaxSize) {
185                final int checkedOut = oldConnectionCount - oldPermitsAvailable;
186                int shrinkLater = checkedOut - newMaxSize;
187                if (shrinkLater < 0) {
188                    shrinkLater = 0;
189                }
190                this.shrinkLater = shrinkLater;
191                int shrinkNow = oldConnectionCount - newMaxSize - shrinkLater;
192                if (shrinkNow < 0) {
193                    shrinkNow = 0;
194                }
195                this.shrinkNow = shrinkNow;
196                if (newMaxSize >= oldMinSize) {
197                    newMinSize = oldMinSize;
198                } else {
199                    newMinSize = newMaxSize;
200                }
201                this.transferCheckedOut = checkedOut - shrinkLater;
202            }
203    
204            public int getNewMinSize() {
205                return newMinSize;
206            }
207    
208            public int getShrinkNow() {
209                return shrinkNow;
210            }
211    
212            public int getShrinkLater() {
213                return shrinkLater;
214            }
215    
216            public int getTransferCheckedOut() {
217                return transferCheckedOut;
218            }
219    
220    
221        }
222    
223        protected abstract void transferConnections(int maxSize, int shrinkNow);
224    
225        public abstract int getIdleConnectionCount();
226    
227        public int getConnectionCount() {
228            return connectionCount;
229        }
230    
231        public int getPartitionMinSize() {
232            return minSize;
233        }
234    
235        public void setPartitionMinSize(int minSize) {
236            this.minSize = minSize;
237        }
238    
239        public int getBlockingTimeoutMilliseconds() {
240            return blockingTimeoutMilliseconds;
241        }
242    
243        public void setBlockingTimeoutMilliseconds(int blockingTimeoutMilliseconds) {
244            if (blockingTimeoutMilliseconds < 0) {
245                throw new IllegalArgumentException("blockingTimeoutMilliseconds must be positive or 0, not " + blockingTimeoutMilliseconds);
246            }
247            if (blockingTimeoutMilliseconds == 0) {
248                this.blockingTimeoutMilliseconds = Integer.MAX_VALUE;
249            } else {
250                this.blockingTimeoutMilliseconds = blockingTimeoutMilliseconds;
251            }
252        }
253    
254        public int getIdleTimeoutMinutes() {
255            return (int) idleTimeoutMilliseconds / (1000 * 60);
256        }
257    
258        public void setIdleTimeoutMinutes(int idleTimeoutMinutes) {
259            if (idleTimeoutMinutes < 0) {
260                throw new IllegalArgumentException("idleTimeoutMinutes must be positive or 0, not " + idleTimeoutMinutes);
261            }
262            if (idleReleaser != null) {
263                idleReleaser.cancel();
264            }
265            if (idleTimeoutMinutes > 0) {
266                this.idleTimeoutMilliseconds = idleTimeoutMinutes * 60 * 1000;
267                idleReleaser = new IdleReleaser(this);
268                timer.schedule(idleReleaser, this.idleTimeoutMilliseconds, this.idleTimeoutMilliseconds);
269            }
270        }
271    
272        protected abstract void getExpiredManagedConnectionInfos(long threshold, ArrayList killList);
273    
274        protected abstract boolean addToPool(ManagedConnectionInfo mci);
275    
276        // static class to permit chain of strong references from preventing ClassLoaders
277        // from being GC'ed.
278        private static class IdleReleaser extends TimerTask {
279            private AbstractSinglePoolConnectionInterceptor parent;
280    
281            private IdleReleaser(AbstractSinglePoolConnectionInterceptor parent) {
282                this.parent = parent;
283            }
284    
285            public boolean cancel() {
286                this.parent = null;
287                return super.cancel();
288            }
289    
290            public void run() {
291                // protect against interceptor being set to null mid-execution
292                AbstractSinglePoolConnectionInterceptor interceptor = parent;
293                if (interceptor == null)
294                    return;
295    
296                interceptor.resizeLock.readLock().lock();
297                try {
298                    long threshold = System.currentTimeMillis() - interceptor.idleTimeoutMilliseconds;
299                    ArrayList killList = new ArrayList(interceptor.getPartitionMaxSize());
300                    interceptor.getExpiredManagedConnectionInfos(threshold, killList);
301                    for (Iterator i = killList.iterator(); i.hasNext();) {
302                        ManagedConnectionInfo managedConnectionInfo = (ManagedConnectionInfo) i.next();
303                        ConnectionInfo killInfo = new ConnectionInfo(managedConnectionInfo);
304                        interceptor.internalReturn(killInfo, ConnectionReturnAction.DESTROY);
305                    }
306                    interceptor.permits.release(killList.size());
307                } catch (Throwable t) {
308                    log.error("Error occurred during execution of ExpirationMonitor TimerTask", t);
309                } finally {
310                    interceptor.resizeLock.readLock().unlock();
311                }
312            }
313    
314        }
315    
316        // Currently only a short-lived (10 millisecond) task.
317        // So, FillTask, unlike IdleReleaser, shouldn't cause GC problems.
318        protected class FillTask extends TimerTask {
319            private final ManagedConnectionFactory managedConnectionFactory;
320            private final Subject subject;
321            private final ConnectionRequestInfo cri;
322    
323            public FillTask(ConnectionInfo connectionInfo) {
324                managedConnectionFactory = connectionInfo.getManagedConnectionInfo().getManagedConnectionFactory();
325                subject = connectionInfo.getManagedConnectionInfo().getSubject();
326                cri = connectionInfo.getManagedConnectionInfo().getConnectionRequestInfo();
327            }
328    
329            public void run() {
330                resizeLock.readLock().lock();
331                try {
332                    while (connectionCount < minSize) {
333                        ManagedConnectionInfo mci = new ManagedConnectionInfo(managedConnectionFactory, cri);
334                        mci.setSubject(subject);
335                        ConnectionInfo ci = new ConnectionInfo(mci);
336                        try {
337                            next.getConnection(ci);
338                        } catch (ResourceException e) {
339                            return;
340                        }
341                        boolean added = addToPool(mci);
342                        if (!added) {
343                            internalReturn(ci, ConnectionReturnAction.DESTROY);
344                            return;
345                        }
346                    }
347                } catch (Throwable t) {
348                    log.error("FillTask encountered error in run method", t);
349                } finally {
350                    resizeLock.readLock().unlock();
351                }
352            }
353    
354        }
355    }