ConnectionPool

basic.py
 1import asyncio
 2import typing
 3
 4from zcached.asyncio import AsyncZCached, AsyncConnectionPool, AsyncConnection
 5from zcached import Result
 6
 7
 8async def main():
 9    connection_pool = AsyncConnectionPool(
10        # Connections in the pool. If we do not send a large number of requests, we can use only one.
11        pool_size=1,
12        # Some function that does not take any arguments, but returns the created connection.
13        connection_factory=lambda: AsyncConnection(host="127.0.0.1", port=7556),
14    )
15
16    client: AsyncZCached = AsyncZCached.from_connection_pool(connection_pool)
17    await client.run()
18
19    response: Result[typing.List[str]] = await client.keys()
20    if response.error:
21        raise RuntimeError(response.error)
22
23    print(response.value)
24
25
26if __name__ == "__main__":
27    asyncio.run(main())