]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/db/query_pool.py
Development environment: add missing dependencies
[nominatim.git] / src / nominatim_db / db / query_pool.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 A connection pool that executes incoming queries in parallel.
9 """
10 from typing import Any, Tuple, Optional
11 import asyncio
12 import logging
13 import time
14
15 import psycopg
16
17 LOG = logging.getLogger()
18
19 QueueItem = Optional[Tuple[psycopg.abc.Query, Any]]
20
21 class QueryPool:
22     """ Pool to run SQL queries in parallel asynchronous execution.
23
24         All queries are run in autocommit mode. If parallel execution leads
25         to a deadlock, then the query is repeated.
26         The results of the queries is discarded.
27     """
28     def __init__(self, dsn: str, pool_size: int = 1, **conn_args: Any) -> None:
29         self.wait_time = 0.0
30         self.query_queue: 'asyncio.Queue[QueueItem]' = asyncio.Queue(maxsize=2 * pool_size)
31
32         self.pool = [asyncio.create_task(self._worker_loop(dsn, **conn_args))
33                      for _ in range(pool_size)]
34
35
36     async def put_query(self, query: psycopg.abc.Query, params: Any) -> None:
37         """ Schedule a query for execution.
38         """
39         tstart = time.time()
40         await self.query_queue.put((query, params))
41         self.wait_time += time.time() - tstart
42         await asyncio.sleep(0)
43
44
45     async def finish(self) -> None:
46         """ Wait for all queries to finish and close the pool.
47         """
48         for _ in self.pool:
49             await self.query_queue.put(None)
50
51         tstart = time.time()
52         await asyncio.wait(self.pool)
53         self.wait_time += time.time() - tstart
54
55         for task in self.pool:
56             excp = task.exception()
57             if excp is not None:
58                 raise excp
59
60
61     async def _worker_loop(self, dsn: str, **conn_args: Any) -> None:
62         conn_args['autocommit'] = True
63         aconn = await psycopg.AsyncConnection.connect(dsn, **conn_args)
64         async with aconn:
65             async with aconn.cursor() as cur:
66                 item = await self.query_queue.get()
67                 while item is not None:
68                     try:
69                         if item[1] is None:
70                             await cur.execute(item[0])
71                         else:
72                             await cur.execute(item[0], item[1])
73
74                         item = await self.query_queue.get()
75                     except psycopg.errors.DeadlockDetected:
76                         assert item is not None
77                         LOG.info("Deadlock detected (sql = %s, params = %s), retry.",
78                                  str(item[0]), str(item[1]))
79                         # item is still valid here, causing a retry
80
81
82     async def __aenter__(self) -> 'QueryPool':
83         return self
84
85
86     async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
87         await self.finish()