001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.connector.outbound;
019    
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.Collections;
023    
024    import javax.resource.ResourceException;
025    import javax.resource.spi.ManagedConnection;
026    
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    
030    /**
031     * SinglePoolConnectionInterceptor chooses a single connection from the pool.  If selectOneAssumeMatch
032     * is true, it simply returns the selected connection.
033     * THIS SHOULD BE USED ONLY IF MAXIMUM SPEED IS ESSENTIAL AND YOU HAVE THOROUGLY CHECKED THAT
034     * MATCHING WOULD SUCCEED ON THE SELECTED CONNECTION. (i.e., read the docs on your connector
035     * to find out how matching works)
036     * If selectOneAssumeMatch is false, it checks with the ManagedConnectionFactory that the
037     * selected connection does match before returning it: if not it throws an exception.
038     *
039     * @version $Rev: 550546 $ $Date: 2007-06-25 12:52:11 -0400 (Mon, 25 Jun 2007) $
040     */
041    public class SinglePoolConnectionInterceptor extends AbstractSinglePoolConnectionInterceptor {
042        private static final Log log = LogFactory.getLog(SinglePoolConnectionInterceptor.class.getName());
043    
044        private boolean selectOneAssumeMatch;
045    
046        private PoolDeque pool;
047    
048        public SinglePoolConnectionInterceptor(final ConnectionInterceptor next,
049                                               int maxSize,
050                                               int minSize,
051                                               int blockingTimeoutMilliseconds,
052                                               int idleTimeoutMinutes,
053                                               boolean selectOneAssumeMatch) {
054            super(next, maxSize, minSize, blockingTimeoutMilliseconds, idleTimeoutMinutes);
055            pool = new PoolDeque(maxSize);
056            this.selectOneAssumeMatch = selectOneAssumeMatch;
057        }
058    
059        protected void internalGetConnection(ConnectionInfo connectionInfo) throws ResourceException {
060            synchronized (pool) {
061                if (destroyed) {
062                    throw new ResourceException("ManagedConnection pool has been destroyed");
063                }
064    
065                ManagedConnectionInfo newMCI = null;
066                if (pool.isEmpty()) {
067                    next.getConnection(connectionInfo);
068                    connectionCount++;
069                    if (log.isTraceEnabled()) {
070                        log.trace("Supplying new connection MCI: " + connectionInfo.getManagedConnectionInfo() + " MC: " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " from pool: " + this);
071                    }
072                    return;
073                } else {
074                    newMCI = pool.removeLast();
075                }
076                if (connectionCount < minSize) {
077                    timer.schedule(new FillTask(connectionInfo), 10);
078                }
079                if (selectOneAssumeMatch) {
080                    connectionInfo.setManagedConnectionInfo(newMCI);
081                    if (log.isTraceEnabled()) {
082                        log.trace("Supplying pooled connection without checking matching MCI: " + connectionInfo.getManagedConnectionInfo() + " MC: " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " from pool: " + this);
083                    }
084                    return;
085                }
086                try {
087                    ManagedConnectionInfo mci = connectionInfo.getManagedConnectionInfo();
088                    ManagedConnection matchedMC =
089                            newMCI
090                            .getManagedConnectionFactory()
091                            .matchManagedConnections(Collections.singleton(newMCI.getManagedConnection()),
092                                    mci.getSubject(),
093                                    mci.getConnectionRequestInfo());
094                    if (matchedMC != null) {
095                        connectionInfo.setManagedConnectionInfo(newMCI);
096                        if (log.isTraceEnabled()) {
097                            log.trace("Supplying pooled connection  MCI: " + connectionInfo.getManagedConnectionInfo() + " MC: " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " from pool: " + this);
098                        }
099                        return;
100                    } else {
101                        //matching failed.
102                        ConnectionInfo returnCI = new ConnectionInfo();
103                        returnCI.setManagedConnectionInfo(newMCI);
104                        returnConnection(returnCI,
105                                ConnectionReturnAction.RETURN_HANDLE);
106                        throw new ResourceException("The pooling strategy does not match the MatchManagedConnections implementation.  Please investigate and reconfigure this pool");
107                    }
108                } catch (ResourceException e) {
109                    //something is wrong: destroy connection, rethrow, release permit
110                    ConnectionInfo returnCI = new ConnectionInfo();
111                    returnCI.setManagedConnectionInfo(newMCI);
112                    returnConnection(returnCI,
113                            ConnectionReturnAction.DESTROY);
114                    throw e;
115                }
116            }
117        }
118    
119        protected void internalDestroy() {
120            synchronized (pool) {
121                while (!pool.isEmpty()) {
122                    ManagedConnection mc = pool.removeLast().getManagedConnection();
123                    if (mc != null) {
124                        try {
125                            mc.destroy();
126                        }
127                        catch (ResourceException re) { } // ignore
128                    }
129                }
130            }
131        }
132    
133        protected boolean internalReturn(ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction) {
134            ManagedConnectionInfo mci = connectionInfo.getManagedConnectionInfo();
135            ManagedConnection mc = mci.getManagedConnection();
136            if (connectionReturnAction == ConnectionReturnAction.RETURN_HANDLE) {
137                try {
138                    mc.cleanup();
139                } catch (ResourceException e) {
140                    connectionReturnAction = ConnectionReturnAction.DESTROY;
141                }
142            }
143            boolean wasInPool = false;
144            synchronized (pool) {
145                // a bit redundant with returnConnection check in AbstractSinglePoolConnectionInterceptor, 
146                // but checking here closes a small timing hole...
147                if (destroyed) {
148                    try {
149                        mc.destroy();
150                    }
151                    catch (ResourceException re) { } // ignore
152                    return pool.remove(mci);
153                }
154    
155                if (shrinkLater > 0) {
156                    //nothing can get in the pool while shrinkLater > 0, so wasInPool is false here.
157                    connectionReturnAction = ConnectionReturnAction.DESTROY;
158                    shrinkLater--;
159                } else if (connectionReturnAction == ConnectionReturnAction.RETURN_HANDLE) {
160                    mci.setLastUsed(System.currentTimeMillis());
161                    pool.add(mci);
162                    return wasInPool;
163                } else {
164                    wasInPool = pool.remove(mci);
165                }
166            }
167            //we must destroy connection.
168            next.returnConnection(connectionInfo, connectionReturnAction);
169            connectionCount--;
170            return wasInPool;
171        }
172    
173        public int getPartitionMaxSize() {
174            return pool.capacity();
175        }
176    
177        protected void transferConnections(int maxSize, int shrinkNow) {
178            //1st example: copy 0 (none)
179            //2nd example: copy 10 (all)
180            PoolDeque oldPool = pool;
181            pool = new PoolDeque(maxSize);
182            //since we have replaced pool already, pool.remove will be very fast:-)
183            for (int i = 0; i < shrinkNow; i++) {
184                ConnectionInfo killInfo = new ConnectionInfo(oldPool.peek(i));
185                internalReturn(killInfo, ConnectionReturnAction.DESTROY);
186            }
187            for (int i = shrinkNow; i < connectionCount; i++) {
188                pool.add(oldPool.peek(i));
189            }
190        }
191    
192        public int getIdleConnectionCount() {
193            return pool.currentSize();
194        }
195    
196    
197        protected void getExpiredManagedConnectionInfos(long threshold, ArrayList killList) {
198            synchronized (pool) {
199                for (int i = 0; i < pool.currentSize(); i++) {
200                    ManagedConnectionInfo mci = pool.peek(i);
201                    if (mci.getLastUsed() < threshold) {
202                        killList.add(mci);
203                    }
204                }
205            }
206        }
207    
208        protected boolean addToPool(ManagedConnectionInfo mci) {
209            boolean added;
210            synchronized (pool) {
211                connectionCount++;
212                added = getPartitionMaxSize() > getIdleConnectionCount();
213                if (added) {
214                    pool.add(mci);
215                }
216            }
217            return added;
218        }
219    
220        static class PoolDeque {
221    
222            private final ManagedConnectionInfo[] deque;
223            private final int first = 0;
224            private int last = -1;
225    
226            public PoolDeque(int size) {
227                deque = new ManagedConnectionInfo[size];
228            }
229    
230            //internal
231            public boolean isEmpty() {
232                return first > last;
233            }
234    
235            //internal
236            public void add(ManagedConnectionInfo mci) {
237                if (last == deque.length - 1) {
238                    throw new IllegalStateException("deque is full: contents: " + Arrays.asList(deque));
239                }
240                deque[++last] = mci;
241            }
242    
243            //internal
244            public ManagedConnectionInfo peek(int i) {
245                if (i < first || i > last) {
246                    throw new IllegalStateException("index is out of current range");
247                }
248                return deque[i];
249            }
250    
251            //internal
252            public ManagedConnectionInfo removeLast() {
253                if (isEmpty()) {
254                    throw new IllegalStateException("deque is empty");
255                }
256    
257                return deque[last--];
258            }
259    
260            //internal
261            public boolean remove(ManagedConnectionInfo mci) {
262                for (int i = first; i <= last; i++) {
263                    if (deque[i] == mci) {
264                        for (int j = i + 1; j <= last; j++) {
265                            deque[j - 1] = deque[j];
266                        }
267                        last--;
268                        return true;
269                    }
270    
271                }
272                return false;
273            }
274    
275            //internal
276            public int capacity() {
277                return deque.length;
278            }
279    
280            //internal
281            public int currentSize() {
282                return last - first + 1;
283            }
284        }
285    
286    }