[docs]classAsyncConnectionPool:""" A pool of asynchronous connections. Parameters ---------- pool_size: The maximum size of the connection pool. connection_factory: A callable that returns a new instance of AsyncConnection. """__slots__=("_pool_size","_connection_factory","_connections")def__init__(self,pool_size:int,connection_factory:Callable[[],AsyncConnection],)->None:self._pool_size:int=pool_sizeself._connection_factory:Callable[[],AsyncConnection]=connection_factoryself._connections:List[AsyncConnection]=[]logger.info(f"Initiated a new connection pool. Pool size: {self._pool_size}.")def__repr__(self)->str:returnf"<AsyncConnectionPool(size={self._pool_size}, connected_connections={len(self.connected_connections)})>"@propertydefconnections(self)->List[AsyncConnection]:"""List of all connections in the pool."""returnself._connections@propertydefconnected_connections(self)->List[AsyncConnection]:"""List of all connected connections in the pool."""returnlist(filter(lambdaconn:conn.is_connected(),self._connections))@propertydefbroken_connections(self)->List[AsyncConnection]:"""List of all broken (not connected) connections in the pool."""returnlist(filter(lambdaconn:notconn.is_connected(),self._connections))@propertydefpool_size(self)->int:"""The maximum size of the connection pool."""returnself._pool_size@propertydefconnection_factory(self)->Callable[[],AsyncConnection]:"""The factory function to create a new connection."""returnself._connection_factory
[docs]defis_working(self)->bool:"""True if there is any working connection in the pool."""returnlen(self.connected_connections)>0
[docs]defis_empty(self)->bool:"""True if the pool is empty."""returnlen(self.connections)==0
[docs]defis_full(self)->bool:"""True if the pool is full."""returnlen(self.connections)==self._pool_size
[docs]asyncdefsetup(self)->None:"""Creates connections in the pool and connects them."""self._connections.clear()logger.info("Filling the connection pool")tasks:List[Task]=[]for_inrange(self._pool_size):connection:AsyncConnection=self._connection_factory()self._connections.append(connection)tasks.append(connection.loop.create_task(connection.connect()))logger.debug("Running all connections in the pool")awaitgather(*tasks)
[docs]asyncdefclose(self)->None:"""Closes all connected connections in the pool."""logger.info("Closing: %s connnections in the pool",len(self.connected_connections))awaitgather(*(conn.loop.create_task(conn.close())forconninself.connected_connections))self._connections.clear()
[docs]asyncdefextend_pool_by_size(self,size:int)->None:""" Extends the pool by a specified number of connections. The pool size will be increased if necessary. Parameters ---------- size: The number of connections to add to the pool. """returnawaitself.extend_pool_by_connections([self.connection_factory()for_inrange(size)])
[docs]asyncdefextend_pool_by_connections(self,connections:Iterable[AsyncConnection])->None:""" Extends the pool with existing connections. The pool size will be increased if necessary. Parameters ---------- connections: An iterable of existing connections to add to the pool. """tasks:list[Task]=[conn.loop.create_task(conn.connect())forconninconnections]self._connections.extend(connections)self._pool_size=len(self._connections)awaitgather(*tasks)logger.debug("Extended connection pool. New size: %s.",self._pool_size)
[docs]asyncdefreconnect(self,only_broken_connections:bool=True)->int:""" Attempts to reconnect connections in the pool. Returns the number of connected connections after reconnection. Parameters ---------- only_broken_connections: If True, attempts to reconnect only broken connections. """logger.debug("Reconnecting connection pool. Only broken connections: %s.",only_broken_connections,)connections:List[AsyncConnection]=(self.broken_connectionsifonly_broken_connectionselseself.connections)awaitgather(*(conn.loop.create_task(conn.try_reconnect())forconninconnections))returnlen(self.connected_connections)
[docs]defreduce_pool_connections(self,amount:int,delete_pending_connections:bool=False)->None:""" Reduces the size of the connection pool by a specified amount. By default, the method first removes non-working connections, when this is not enough it takes connections that have 0 pending requests and removes them too. If a connection has any pending requests, it doesn't delete it, well, unless the delete_pending_connections parameter is on True. Parameters ---------- amount: The number of connections to remove from the pool. delete_pending_connections: If True, the method will also delete, when necessary, connections that have pending requests. This is not strongly recommended! """logger.debug("Reducing the pool connections. Amount: %s, pending: %s.",amount,delete_pending_connections,)ifamount<=0:returnself._pool_size-=amountif0>self._pool_size:self._pool_size=0ifself._pool_size>=len(self.connections):return# First, let's get rid of the non-working connections.forbroken_connectioninself.broken_connections:self._connections.remove(broken_connection)# We don't care about this task, let's run this in background._=broken_connection.loop.create_task(broken_connection.close())ifself._pool_size>=len(self.connections):returnfor_inrange(len(self.connected_connections)):try:connection:AsyncConnection=self.get_least_loaded_connection()exceptIndexError:break# There are no other connections.ifconnection.pending_requests==0ordelete_pending_connections:self._connections.remove(connection)# We don't care about this task, let's run this in background._=connection.loop.create_task(connection.close())ifself._pool_size>=len(self.connections):returnself._pool_size=len(self.connections)
[docs]defcleanup_broken_connections(self)->None:"""Closes broken connections and removes them from the list of connections."""logger.info("Clearing non-working connections...")ifnotself.broken_connections:returnforbroken_connectioninself.broken_connections:_=broken_connection.loop.create_task(broken_connection.close())self._connections.remove(broken_connection)
[docs]defget_least_loaded_connection(self)->AsyncConnection:""" Get the least loaded connection from the pool. Only working connections are considered. Raises ------ IndexError If the pool is empty. """connections:List[AsyncConnection]=self.connected_connectionsconnections.sort(key=lambdaconnection:connection.pending_requests)returnconnections[0]