001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.jms.pool;
019
020import java.util.List;
021import java.util.concurrent.CopyOnWriteArrayList;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import javax.jms.Connection;
025import javax.jms.ExceptionListener;
026import javax.jms.IllegalStateException;
027import javax.jms.JMSException;
028import javax.jms.Session;
029import javax.jms.TemporaryQueue;
030import javax.jms.TemporaryTopic;
031
032import org.apache.commons.pool2.BasePooledObjectFactory;
033import org.apache.commons.pool2.KeyedObjectPool;
034import org.apache.commons.pool2.KeyedPooledObjectFactory;
035import org.apache.commons.pool2.PooledObject;
036import org.apache.commons.pool2.impl.DefaultPooledObject;
037import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
038import org.apache.commons.pool2.impl.GenericObjectPool;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Holds a real JMS connection along with the session pools associated with it.
044 * <p/>
045 * Instances of this class are shared amongst one or more PooledConnection object and must
046 * track the session objects that are loaned out for cleanup on close as well as ensuring
047 * that the temporary destinations of the managed Connection are purged when all references
048 * to this ConnectionPool are released.
049 */
050public class ConnectionPool implements ExceptionListener {
051    private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
052
053    protected Connection connection;
054    private int referenceCount;
055    private long lastUsed = System.currentTimeMillis();
056    private final long firstUsed = lastUsed;
057    private boolean hasExpired;
058    private int idleTimeout = 30 * 1000;
059    private long expiryTimeout = 0l;
060    private boolean useAnonymousProducers = true;
061
062    private final AtomicBoolean started = new AtomicBoolean(false);
063    private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool;
064    private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
065    private boolean reconnectOnException;
066    private ExceptionListener parentExceptionListener;
067
068    public ConnectionPool(Connection connection) {
069
070        this.connection = wrap(connection);
071
072        // Create our internal Pool of session instances.
073        this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
074            new KeyedPooledObjectFactory<SessionKey, SessionHolder>() {
075                @Override
076                public PooledObject<SessionHolder> makeObject(SessionKey sessionKey) throws Exception {
077
078                    return new DefaultPooledObject<SessionHolder>(new SessionHolder(makeSession(sessionKey)));
079                }
080
081                @Override
082                public void destroyObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception {
083                    ((SessionHolder)pooledObject.getObject()).close();
084                }
085
086                @Override
087                public boolean validateObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) {
088                    return true;
089                }
090
091                @Override
092                public void activateObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception {
093                }
094
095                @Override
096                public void passivateObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception {
097                }
098            }
099        );
100    }
101
102    // useful when external failure needs to force expiry
103    public void setHasExpired(boolean val) {
104        hasExpired = val;
105    }
106
107    protected Session makeSession(SessionKey key) throws JMSException {
108        return connection.createSession(key.isTransacted(), key.getAckMode());
109    }
110
111    protected Connection wrap(Connection connection) {
112        return connection;
113    }
114
115    protected void unWrap(Connection connection) {
116    }
117
118    public void start() throws JMSException {
119        if (started.compareAndSet(false, true)) {
120            try {
121                connection.start();
122            } catch (JMSException e) {
123                started.set(false);
124                throw(e);
125            }
126        }
127    }
128
129    public synchronized Connection getConnection() {
130        return connection;
131    }
132
133    public Session createSession(boolean transacted, int ackMode) throws JMSException {
134        SessionKey key = new SessionKey(transacted, ackMode);
135        PooledSession session;
136        try {
137            session = new PooledSession(key, sessionPool.borrowObject(key), sessionPool, key.isTransacted(), useAnonymousProducers);
138            session.addSessionEventListener(new PooledSessionEventListener() {
139
140                @Override
141                public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
142                }
143
144                @Override
145                public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
146                }
147
148                @Override
149                public void onSessionClosed(PooledSession session) {
150                    ConnectionPool.this.loanedSessions.remove(session);
151                }
152            });
153            this.loanedSessions.add(session);
154        } catch (Exception e) {
155            IllegalStateException illegalStateException = new IllegalStateException(e.toString());
156            illegalStateException.initCause(e);
157            throw illegalStateException;
158        }
159        return session;
160    }
161
162    public synchronized void close() {
163        if (connection != null) {
164            try {
165                sessionPool.close();
166            } catch (Exception e) {
167            } finally {
168                try {
169                    connection.close();
170                } catch (Exception e) {
171                } finally {
172                    connection = null;
173                }
174            }
175        }
176    }
177
178    public synchronized void incrementReferenceCount() {
179        referenceCount++;
180        lastUsed = System.currentTimeMillis();
181    }
182
183    public synchronized void decrementReferenceCount() {
184        referenceCount--;
185        lastUsed = System.currentTimeMillis();
186        if (referenceCount == 0) {
187            // Loaned sessions are those that are active in the sessionPool and
188            // have not been closed by the client before closing the connection.
189            // These need to be closed so that all session's reflect the fact
190            // that the parent Connection is closed.
191            for (PooledSession session : this.loanedSessions) {
192                try {
193                    session.close();
194                } catch (Exception e) {
195                }
196            }
197            this.loanedSessions.clear();
198
199            unWrap(getConnection());
200
201            expiredCheck();
202        }
203    }
204
205    /**
206     * Determines if this Connection has expired.
207     * <p/>
208     * A ConnectionPool is considered expired when all references to it are released AND either
209     * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed.
210     * Once a ConnectionPool is determined to have expired its underlying Connection is closed.
211     *
212     * @return true if this connection has expired.
213     */
214    public synchronized boolean expiredCheck() {
215
216        boolean expired = false;
217
218        if (connection == null) {
219            return true;
220        }
221
222        if (hasExpired) {
223            if (referenceCount == 0) {
224                close();
225                expired = true;
226            }
227        }
228
229        if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
230            hasExpired = true;
231            if (referenceCount == 0) {
232                close();
233                expired = true;
234            }
235        }
236
237        // Only set hasExpired here is no references, as a Connection with references is by
238        // definition not idle at this time.
239        if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) {
240            hasExpired = true;
241            close();
242            expired = true;
243        }
244
245        return expired;
246    }
247
248    public int getIdleTimeout() {
249        return idleTimeout;
250    }
251
252    public void setIdleTimeout(int idleTimeout) {
253        this.idleTimeout = idleTimeout;
254    }
255
256    public void setExpiryTimeout(long expiryTimeout) {
257        this.expiryTimeout = expiryTimeout;
258    }
259
260    public long getExpiryTimeout() {
261        return expiryTimeout;
262    }
263
264    public int getMaximumActiveSessionPerConnection() {
265        return this.sessionPool.getMaxTotalPerKey();
266    }
267
268    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
269        this.sessionPool.setMaxTotalPerKey(maximumActiveSessionPerConnection);
270    }
271
272    public boolean isUseAnonymousProducers() {
273        return this.useAnonymousProducers;
274    }
275
276    public void setUseAnonymousProducers(boolean value) {
277        this.useAnonymousProducers = value;
278    }
279
280    /**
281     * @return the total number of Pooled session including idle sessions that are not
282     *          currently loaned out to any client.
283     */
284    public int getNumSessions() {
285        return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive();
286    }
287
288    /**
289     * @return the total number of Sessions that are in the Session pool but not loaned out.
290     */
291    public int getNumIdleSessions() {
292        return this.sessionPool.getNumIdle();
293    }
294
295    /**
296     * @return the total number of Session's that have been loaned to PooledConnection instances.
297     */
298    public int getNumActiveSessions() {
299        return this.sessionPool.getNumActive();
300    }
301
302    /**
303     * Configure whether the createSession method should block when there are no more idle sessions and the
304     * pool already contains the maximum number of active sessions.  If false the create method will fail
305     * and throw an exception.
306     *
307     * @param block
308     *          Indicates whether blocking should be used to wait for more space to create a session.
309     */
310    public void setBlockIfSessionPoolIsFull(boolean block) {
311        this.sessionPool.setBlockWhenExhausted(block);
312    }
313
314    public boolean isBlockIfSessionPoolIsFull() {
315        return this.sessionPool.getBlockWhenExhausted();
316    }
317
318    /**
319     * Returns the timeout to use for blocking creating new sessions
320     *
321     * @return true if the pooled Connection createSession method will block when the limit is hit.
322     * @see #setBlockIfSessionPoolIsFull(boolean)
323     */
324    public long getBlockIfSessionPoolIsFullTimeout() {
325        return this.sessionPool.getMaxWaitMillis();
326    }
327
328    /**
329     * Controls the behavior of the internal session pool. By default the call to
330     * Connection.getSession() will block if the session pool is full.  This setting
331     * will affect how long it blocks and throws an exception after the timeout.
332     *
333     * The size of the session pool is controlled by the @see #maximumActive
334     * property.
335     *
336     * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull
337     * property
338     *
339     * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true,
340     *                                        then use this setting to configure how long to block before retry
341     */
342    public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
343        this.sessionPool.setMaxWaitMillis(blockIfSessionPoolIsFullTimeout);
344    }
345
346    /**
347     * @return true if the underlying connection will be renewed on JMSException, false otherwise
348     */
349    public boolean isReconnectOnException() {
350        return reconnectOnException;
351    }
352
353    /**
354     * Controls weather the underlying connection should be reset (and renewed) on JMSException
355     *
356     * @param reconnectOnException
357     *          Boolean value that configures whether reconnect on exception should happen
358     */
359    public void setReconnectOnException(boolean reconnectOnException) {
360        this.reconnectOnException = reconnectOnException;
361        try {
362            if (isReconnectOnException()) {
363                if (connection.getExceptionListener() != null) {
364                    parentExceptionListener = connection.getExceptionListener();
365                }
366                connection.setExceptionListener(this);
367            } else {
368                if (parentExceptionListener != null) {
369                    connection.setExceptionListener(parentExceptionListener);
370                }
371                parentExceptionListener = null;
372            }
373        } catch (JMSException jmse) {
374            LOG.warn("Cannot set reconnect exception listener", jmse);
375        }
376    }
377
378    @Override
379    public void onException(JMSException exception) {
380        close();
381        if (parentExceptionListener != null) {
382            parentExceptionListener.onException(exception);
383        }
384    }
385
386    @Override
387    public String toString() {
388        return "ConnectionPool[" + connection + "]";
389    }
390}