ConnectionPool¶
basic.py¶
1import asyncio
2import typing
3
4from zcached.asyncio import AsyncZCached, AsyncConnectionPool, AsyncConnection
5from zcached import Result
6
7
8async def main():
9 connection_pool = AsyncConnectionPool(
10 # Connections in the pool. If we do not send a large number of requests, we can use only one.
11 pool_size=1,
12 # Some function that does not take any arguments, but returns the created connection.
13 connection_factory=lambda: AsyncConnection(host="127.0.0.1", port=7556),
14 )
15
16 client: AsyncZCached = AsyncZCached.from_connection_pool(connection_pool)
17 await client.run()
18
19 response: Result[typing.List[str]] = await client.keys()
20 if response.error:
21 raise RuntimeError(response.error)
22
23 print(response.value)
24
25
26if __name__ == "__main__":
27 asyncio.run(main())