1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 A connection pool that executes incoming queries in parallel.
10 from typing import Any, Tuple, Optional
17 LOG = logging.getLogger()
19 QueueItem = Optional[Tuple[psycopg.abc.Query, Any]]
22 """ Pool to run SQL queries in parallel asynchronous execution.
24 All queries are run in autocommit mode. If parallel execution leads
25 to a deadlock, then the query is repeated.
26 The results of the queries is discarded.
28 def __init__(self, dsn: str, pool_size: int = 1, **conn_args: Any) -> None:
30 self.query_queue: 'asyncio.Queue[QueueItem]' = asyncio.Queue(maxsize=2 * pool_size)
32 self.pool = [asyncio.create_task(self._worker_loop(dsn, **conn_args))
33 for _ in range(pool_size)]
36 async def put_query(self, query: psycopg.abc.Query, params: Any) -> None:
37 """ Schedule a query for execution.
40 await self.query_queue.put((query, params))
41 self.wait_time += time.time() - tstart
42 await asyncio.sleep(0)
45 async def finish(self) -> None:
46 """ Wait for all queries to finish and close the pool.
49 await self.query_queue.put(None)
52 await asyncio.wait(self.pool)
53 self.wait_time += time.time() - tstart
55 for task in self.pool:
56 excp = task.exception()
61 async def _worker_loop(self, dsn: str, **conn_args: Any) -> None:
62 conn_args['autocommit'] = True
63 aconn = await psycopg.AsyncConnection.connect(dsn, **conn_args)
65 async with aconn.cursor() as cur:
66 item = await self.query_queue.get()
67 while item is not None:
70 await cur.execute(item[0])
72 await cur.execute(item[0], item[1])
74 item = await self.query_queue.get()
75 except psycopg.errors.DeadlockDetected:
76 assert item is not None
77 LOG.info("Deadlock detected (sql = %s, params = %s), retry.",
78 str(item[0]), str(item[1]))
79 # item is still valid here, causing a retry
82 async def __aenter__(self) -> 'QueryPool':
86 async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: