[docs]classConnectionPool:""" A pool of connections. Parameters ---------- pool_size: The maximum size of the connection pool. connection_factory: A callable that returns a new instance of Connection. """__slots__=("_pool_size","_connection_factory","_connections")def__init__(self,pool_size:int,connection_factory:Callable[[],Connection],)->None:self._pool_size:int=pool_sizeself._connection_factory:Callable[[],Connection]=connection_factoryself._connections:List[Connection]=[]logger.info(f"Initiated a new connection pool. Pool size: {self._pool_size}.")def__repr__(self)->str:returnf"<ConnectionPool(size={self._pool_size}, connected_connections={len(self.connected_connections)})>"@propertydefconnections(self)->List[Connection]:"""List of all connections in the pool."""returnself._connections@propertydefconnected_connections(self)->List[Connection]:"""List of all connected connections in the pool."""returnlist(filter(lambdaconn:conn.is_connected(),self._connections))@propertydefbroken_connections(self)->List[Connection]:"""List of all broken (not connected) connections in the pool."""returnlist(filter(lambdaconn:notconn.is_connected(),self._connections))@propertydefpool_size(self)->int:"""The maximum size of the connection pool."""returnself._pool_size@propertydefconnection_factory(self)->Callable[[],Connection]:"""The factory function to create a new connection."""returnself._connection_factory
[docs]defis_working(self)->bool:"""True if there is any working connection in the pool."""returnlen(self.connected_connections)>0
[docs]defis_empty(self)->bool:"""True if the pool is empty."""returnlen(self.connections)==0
[docs]defis_full(self)->bool:"""True if the pool is full."""returnlen(self.connections)==self._pool_size
[docs]defsetup(self)->None:"""Creates connections in the pool and connects them."""self._connections.clear()logger.info("Filling the connection pool.")threads:List[Thread]=[]for_inrange(self._pool_size):connection:Connection=self._connection_factory()self._connections.append(connection)thread:Thread=self.run_in_thread(func=connection.connect)threads.append(thread)fortinthreads:# Waiting for all threads.t.join()
[docs]defclose(self)->None:"""Closes all connected connections in the pool."""logger.info("Closing: %s connections in the pool",len(self.connected_connections))forconnectioninself.connected_connections:self.run_in_thread(func=connection.close)# We don't care about result.self._connections.clear()
[docs]defextend_pool_by_size(self,size:int)->None:""" Extends the pool by a specified number of connections. The pool size will be increased if necessary. Parameters ---------- size: The number of connections to add to the pool. """returnself.extend_pool_by_connections([self._connection_factory()for_inrange(size)])
[docs]defextend_pool_by_connections(self,connections:Iterable[Connection])->None:""" Extends the pool with existing connections. The pool size will be increased if necessary. Parameters ---------- connections: An iterable of existing connections to add to the pool. """threads:list[Thread]=[self.run_in_thread(func=conn.connect)forconninconnections]self._connections.extend(connections)self._pool_size=len(self._connections)forthreadinthreads:thread.join()logger.debug("Extended connection pool. New size: %s.",self._pool_size)
[docs]defreconnect(self,only_broken_connections:bool=True)->int:""" Attempts to reconnect connections in the pool. Returns the number of connected connections after reconnection. Parameters ---------- only_broken_connections: If True, attempts to reconnect only broken connections. """logger.debug("Reconnecting connection pool. Only broken connections: %s.",only_broken_connections,)connections:List[Connection]=(self.broken_connectionsifonly_broken_connectionselseself.connections)threads:List[Thread]=[self.run_in_thread(func=conn.try_reconnect)forconninconnections]forthreadinthreads:thread.join()returnlen(self.connected_connections)
[docs]defreduce_pool_connections(self,amount:int,delete_pending_connections:bool=False)->None:""" Reduces the size of the connection pool by a specified amount. By default, the method first removes non-working connections, when this is not enough it takes connections that have 0 pending requests and removes them too. If a connection has any pending requests, it doesn't delete it, well, unless the delete_pending_connections parameter is on True. Parameters ---------- amount: The number of connections to remove from the pool. delete_pending_connections: If True, the method will also delete, when necessary, connections that have pending requests. This is not strongly recommended! """logger.debug("Reducing the pool connections. Amount: %s, pending: %s.",amount,delete_pending_connections,)ifamount<=0:returnself._pool_size-=amountif0>self._pool_size:self._pool_size=0ifself._pool_size>=len(self.connections):return# First, let's get rid of the non-working connections.forbroken_connectioninself.broken_connections:self._connections.remove(broken_connection)# We don't care about this task, let's run this in background.self.run_in_thread(func=broken_connection.close)ifself._pool_size>=len(self.connections):returnfor_inrange(len(self.connected_connections)):try:connection:Connection=self.get_least_loaded_connection()exceptIndexError:break# There are no other connections.ifconnection.pending_requests==0ordelete_pending_connections:self._connections.remove(connection)# We don't care about this task, let's run this in background.self.run_in_thread(func=connection.close)ifself._pool_size>=len(self.connections):returnself._pool_size=len(self.connections)
[docs]defcleanup_broken_connections(self)->None:"""Closes broken connections and removes them from the list of connections."""logger.info("Clearing non-working connections...")ifnotself.broken_connections:returnforbroken_connectioninself.broken_connections:self.run_in_thread(func=broken_connection.close)self._connections.remove(broken_connection)
[docs]defget_least_loaded_connection(self)->Connection:""" Get the least loaded connection from the pool. Only working connections are considered. Raises ------ IndexError If the pool is empty. """connections:List[Connection]=self.connected_connectionsconnections.sort(key=lambdaconnection:connection.pending_requests)returnconnections[0]
[docs]@staticmethoddefrun_in_thread(func:Callable[Param,Any],*args:Param.args,**kwargs:Param.kwargs)->Thread:""" Method to run function in thread. Parameters ---------- func: A function to run in the thread. args: Function args. kwargs: Function kwargs. """thread:Thread=Thread(target=func,args=args,kwargs=kwargs)thread.start()returnthread