QueueItem = Optional[Tuple[psycopg.abc.Query, Any]]
+
class QueryPool:
""" Pool to run SQL queries in parallel asynchronous execution.
self.pool = [asyncio.create_task(self._worker_loop(dsn, **conn_args))
for _ in range(pool_size)]
-
async def put_query(self, query: psycopg.abc.Query, params: Any) -> None:
""" Schedule a query for execution.
"""
self.wait_time += time.time() - tstart
await asyncio.sleep(0)
-
async def finish(self) -> None:
""" Wait for all queries to finish and close the pool.
"""
if excp is not None:
raise excp
-
async def _worker_loop(self, dsn: str, **conn_args: Any) -> None:
conn_args['autocommit'] = True
aconn = await psycopg.AsyncConnection.connect(dsn, **conn_args)
str(item[0]), str(item[1]))
# item is still valid here, causing a retry
-
async def __aenter__(self) -> 'QueryPool':
return self
-
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
await self.finish()