001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.jms.pool; 019 020import java.util.List; 021import java.util.concurrent.CopyOnWriteArrayList; 022import java.util.concurrent.atomic.AtomicBoolean; 023 024import javax.jms.Connection; 025import javax.jms.ExceptionListener; 026import javax.jms.IllegalStateException; 027import javax.jms.JMSException; 028import javax.jms.Session; 029import javax.jms.TemporaryQueue; 030import javax.jms.TemporaryTopic; 031 032import org.apache.commons.pool2.BasePooledObjectFactory; 033import org.apache.commons.pool2.KeyedObjectPool; 034import org.apache.commons.pool2.KeyedPooledObjectFactory; 035import org.apache.commons.pool2.PooledObject; 036import org.apache.commons.pool2.impl.DefaultPooledObject; 037import org.apache.commons.pool2.impl.GenericKeyedObjectPool; 038import org.apache.commons.pool2.impl.GenericObjectPool; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * Holds a real JMS connection along with the session pools associated with it. 044 * <p/> 045 * Instances of this class are shared amongst one or more PooledConnection object and must 046 * track the session objects that are loaned out for cleanup on close as well as ensuring 047 * that the temporary destinations of the managed Connection are purged when all references 048 * to this ConnectionPool are released. 049 */ 050public class ConnectionPool implements ExceptionListener { 051 private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class); 052 053 protected Connection connection; 054 private int referenceCount; 055 private long lastUsed = System.currentTimeMillis(); 056 private final long firstUsed = lastUsed; 057 private boolean hasExpired; 058 private int idleTimeout = 30 * 1000; 059 private long expiryTimeout = 0l; 060 private boolean useAnonymousProducers = true; 061 062 private final AtomicBoolean started = new AtomicBoolean(false); 063 private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool; 064 private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>(); 065 private boolean reconnectOnException; 066 private ExceptionListener parentExceptionListener; 067 068 public ConnectionPool(Connection connection) { 069 070 this.connection = wrap(connection); 071 072 // Create our internal Pool of session instances. 073 this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>( 074 new KeyedPooledObjectFactory<SessionKey, SessionHolder>() { 075 @Override 076 public PooledObject<SessionHolder> makeObject(SessionKey sessionKey) throws Exception { 077 078 return new DefaultPooledObject<SessionHolder>(new SessionHolder(makeSession(sessionKey))); 079 } 080 081 @Override 082 public void destroyObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception { 083 ((SessionHolder)pooledObject.getObject()).close(); 084 } 085 086 @Override 087 public boolean validateObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) { 088 return true; 089 } 090 091 @Override 092 public void activateObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception { 093 } 094 095 @Override 096 public void passivateObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception { 097 } 098 } 099 ); 100 } 101 102 // useful when external failure needs to force expiry 103 public void setHasExpired(boolean val) { 104 hasExpired = val; 105 } 106 107 protected Session makeSession(SessionKey key) throws JMSException { 108 return connection.createSession(key.isTransacted(), key.getAckMode()); 109 } 110 111 protected Connection wrap(Connection connection) { 112 return connection; 113 } 114 115 protected void unWrap(Connection connection) { 116 } 117 118 public void start() throws JMSException { 119 if (started.compareAndSet(false, true)) { 120 try { 121 connection.start(); 122 } catch (JMSException e) { 123 started.set(false); 124 throw(e); 125 } 126 } 127 } 128 129 public synchronized Connection getConnection() { 130 return connection; 131 } 132 133 public Session createSession(boolean transacted, int ackMode) throws JMSException { 134 SessionKey key = new SessionKey(transacted, ackMode); 135 PooledSession session; 136 try { 137 session = new PooledSession(key, sessionPool.borrowObject(key), sessionPool, key.isTransacted(), useAnonymousProducers); 138 session.addSessionEventListener(new PooledSessionEventListener() { 139 140 @Override 141 public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { 142 } 143 144 @Override 145 public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { 146 } 147 148 @Override 149 public void onSessionClosed(PooledSession session) { 150 ConnectionPool.this.loanedSessions.remove(session); 151 } 152 }); 153 this.loanedSessions.add(session); 154 } catch (Exception e) { 155 IllegalStateException illegalStateException = new IllegalStateException(e.toString()); 156 illegalStateException.initCause(e); 157 throw illegalStateException; 158 } 159 return session; 160 } 161 162 public synchronized void close() { 163 if (connection != null) { 164 try { 165 sessionPool.close(); 166 } catch (Exception e) { 167 } finally { 168 try { 169 connection.close(); 170 } catch (Exception e) { 171 } finally { 172 connection = null; 173 } 174 } 175 } 176 } 177 178 public synchronized void incrementReferenceCount() { 179 referenceCount++; 180 lastUsed = System.currentTimeMillis(); 181 } 182 183 public synchronized void decrementReferenceCount() { 184 referenceCount--; 185 lastUsed = System.currentTimeMillis(); 186 if (referenceCount == 0) { 187 // Loaned sessions are those that are active in the sessionPool and 188 // have not been closed by the client before closing the connection. 189 // These need to be closed so that all session's reflect the fact 190 // that the parent Connection is closed. 191 for (PooledSession session : this.loanedSessions) { 192 try { 193 session.close(); 194 } catch (Exception e) { 195 } 196 } 197 this.loanedSessions.clear(); 198 199 unWrap(getConnection()); 200 201 expiredCheck(); 202 } 203 } 204 205 /** 206 * Determines if this Connection has expired. 207 * <p/> 208 * A ConnectionPool is considered expired when all references to it are released AND either 209 * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed. 210 * Once a ConnectionPool is determined to have expired its underlying Connection is closed. 211 * 212 * @return true if this connection has expired. 213 */ 214 public synchronized boolean expiredCheck() { 215 216 boolean expired = false; 217 218 if (connection == null) { 219 return true; 220 } 221 222 if (hasExpired) { 223 if (referenceCount == 0) { 224 close(); 225 expired = true; 226 } 227 } 228 229 if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) { 230 hasExpired = true; 231 if (referenceCount == 0) { 232 close(); 233 expired = true; 234 } 235 } 236 237 // Only set hasExpired here is no references, as a Connection with references is by 238 // definition not idle at this time. 239 if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) { 240 hasExpired = true; 241 close(); 242 expired = true; 243 } 244 245 return expired; 246 } 247 248 public int getIdleTimeout() { 249 return idleTimeout; 250 } 251 252 public void setIdleTimeout(int idleTimeout) { 253 this.idleTimeout = idleTimeout; 254 } 255 256 public void setExpiryTimeout(long expiryTimeout) { 257 this.expiryTimeout = expiryTimeout; 258 } 259 260 public long getExpiryTimeout() { 261 return expiryTimeout; 262 } 263 264 public int getMaximumActiveSessionPerConnection() { 265 return this.sessionPool.getMaxTotalPerKey(); 266 } 267 268 public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) { 269 this.sessionPool.setMaxTotalPerKey(maximumActiveSessionPerConnection); 270 } 271 272 public boolean isUseAnonymousProducers() { 273 return this.useAnonymousProducers; 274 } 275 276 public void setUseAnonymousProducers(boolean value) { 277 this.useAnonymousProducers = value; 278 } 279 280 /** 281 * @return the total number of Pooled session including idle sessions that are not 282 * currently loaned out to any client. 283 */ 284 public int getNumSessions() { 285 return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive(); 286 } 287 288 /** 289 * @return the total number of Sessions that are in the Session pool but not loaned out. 290 */ 291 public int getNumIdleSessions() { 292 return this.sessionPool.getNumIdle(); 293 } 294 295 /** 296 * @return the total number of Session's that have been loaned to PooledConnection instances. 297 */ 298 public int getNumActiveSessions() { 299 return this.sessionPool.getNumActive(); 300 } 301 302 /** 303 * Configure whether the createSession method should block when there are no more idle sessions and the 304 * pool already contains the maximum number of active sessions. If false the create method will fail 305 * and throw an exception. 306 * 307 * @param block 308 * Indicates whether blocking should be used to wait for more space to create a session. 309 */ 310 public void setBlockIfSessionPoolIsFull(boolean block) { 311 this.sessionPool.setBlockWhenExhausted(block); 312 } 313 314 public boolean isBlockIfSessionPoolIsFull() { 315 return this.sessionPool.getBlockWhenExhausted(); 316 } 317 318 /** 319 * Returns the timeout to use for blocking creating new sessions 320 * 321 * @return true if the pooled Connection createSession method will block when the limit is hit. 322 * @see #setBlockIfSessionPoolIsFull(boolean) 323 */ 324 public long getBlockIfSessionPoolIsFullTimeout() { 325 return this.sessionPool.getMaxWaitMillis(); 326 } 327 328 /** 329 * Controls the behavior of the internal session pool. By default the call to 330 * Connection.getSession() will block if the session pool is full. This setting 331 * will affect how long it blocks and throws an exception after the timeout. 332 * 333 * The size of the session pool is controlled by the @see #maximumActive 334 * property. 335 * 336 * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull 337 * property 338 * 339 * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true, 340 * then use this setting to configure how long to block before retry 341 */ 342 public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) { 343 this.sessionPool.setMaxWaitMillis(blockIfSessionPoolIsFullTimeout); 344 } 345 346 /** 347 * @return true if the underlying connection will be renewed on JMSException, false otherwise 348 */ 349 public boolean isReconnectOnException() { 350 return reconnectOnException; 351 } 352 353 /** 354 * Controls weather the underlying connection should be reset (and renewed) on JMSException 355 * 356 * @param reconnectOnException 357 * Boolean value that configures whether reconnect on exception should happen 358 */ 359 public void setReconnectOnException(boolean reconnectOnException) { 360 this.reconnectOnException = reconnectOnException; 361 try { 362 if (isReconnectOnException()) { 363 if (connection.getExceptionListener() != null) { 364 parentExceptionListener = connection.getExceptionListener(); 365 } 366 connection.setExceptionListener(this); 367 } else { 368 if (parentExceptionListener != null) { 369 connection.setExceptionListener(parentExceptionListener); 370 } 371 parentExceptionListener = null; 372 } 373 } catch (JMSException jmse) { 374 LOG.warn("Cannot set reconnect exception listener", jmse); 375 } 376 } 377 378 @Override 379 public void onException(JMSException exception) { 380 close(); 381 if (parentExceptionListener != null) { 382 parentExceptionListener.onException(exception); 383 } 384 } 385 386 @Override 387 public String toString() { 388 return "ConnectionPool[" + connection + "]"; 389 } 390}