]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_api/connection.py
Merge pull request #3582 from lonvia/switch-to-flake
[nominatim.git] / src / nominatim_api / connection.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Extended SQLAlchemy connection class that also includes access to the schema.
9 """
10 from typing import cast, Any, Mapping, Sequence, Union, Dict, Optional, Set, \
11                    Awaitable, Callable, TypeVar
12 import asyncio
13
14 import sqlalchemy as sa
15 from sqlalchemy.ext.asyncio import AsyncConnection
16
17 from .typing import SaFromClause
18 from .sql.sqlalchemy_schema import SearchTables
19 from .sql.sqlalchemy_types import Geometry
20 from .logging import log
21
22 T = TypeVar('T')
23
24
25 class SearchConnection:
26     """ An extended SQLAlchemy connection class, that also contains
27         the table definitions. The underlying asynchronous SQLAlchemy
28         connection can be accessed with the 'connection' property.
29         The 't' property is the collection of Nominatim tables.
30     """
31
32     def __init__(self, conn: AsyncConnection,
33                  tables: SearchTables,
34                  properties: Dict[str, Any]) -> None:
35         self.connection = conn
36         self.t = tables
37         self._property_cache = properties
38         self._classtables: Optional[Set[str]] = None
39         self.query_timeout: Optional[int] = None
40
41     def set_query_timeout(self, timeout: Optional[int]) -> None:
42         """ Set the timeout after which a query over this connection
43             is cancelled.
44         """
45         self.query_timeout = timeout
46
47     async def scalar(self, sql: sa.sql.base.Executable,
48                      params: Union[Mapping[str, Any], None] = None) -> Any:
49         """ Execute a 'scalar()' query on the connection.
50         """
51         log().sql(self.connection, sql, params)
52         return await asyncio.wait_for(self.connection.scalar(sql, params), self.query_timeout)
53
54     async def execute(self, sql: 'sa.Executable',
55                       params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None] = None
56                       ) -> 'sa.Result[Any]':
57         """ Execute a 'execute()' query on the connection.
58         """
59         log().sql(self.connection, sql, params)
60         return await asyncio.wait_for(self.connection.execute(sql, params), self.query_timeout)
61
62     async def get_property(self, name: str, cached: bool = True) -> str:
63         """ Get a property from Nominatim's property table.
64
65             Property values are normally cached so that they are only
66             retrieved from the database when they are queried for the
67             first time with this function. Set 'cached' to False to force
68             reading the property from the database.
69
70             Raises a ValueError if the property does not exist.
71         """
72         lookup_name = f'DBPROP:{name}'
73
74         if cached and lookup_name in self._property_cache:
75             return cast(str, self._property_cache[lookup_name])
76
77         sql = sa.select(self.t.properties.c.value)\
78             .where(self.t.properties.c.property == name)
79         value = await self.connection.scalar(sql)
80
81         if value is None:
82             raise ValueError(f"Property '{name}' not found in database.")
83
84         self._property_cache[lookup_name] = cast(str, value)
85
86         return cast(str, value)
87
88     async def get_db_property(self, name: str) -> Any:
89         """ Get a setting from the database. At the moment, only
90             'server_version', the version of the database software, can
91             be retrieved with this function.
92
93             Raises a ValueError if the property does not exist.
94         """
95         if name != 'server_version':
96             raise ValueError(f"DB setting '{name}' not found in database.")
97
98         return self._property_cache['DB:server_version']
99
100     async def get_cached_value(self, group: str, name: str,
101                                factory: Callable[[], Awaitable[T]]) -> T:
102         """ Access the cache for this Nominatim instance.
103             Each cache value needs to belong to a group and have a name.
104             This function is for internal API use only.
105
106             `factory` is an async callback function that produces
107             the value if it is not already cached.
108
109             Returns the cached value or the result of factory (also caching
110             the result).
111         """
112         full_name = f'{group}:{name}'
113
114         if full_name in self._property_cache:
115             return cast(T, self._property_cache[full_name])
116
117         value = await factory()
118         self._property_cache[full_name] = value
119
120         return value
121
122     async def get_class_table(self, cls: str, typ: str) -> Optional[SaFromClause]:
123         """ Lookup up if there is a classtype table for the given category
124             and return a SQLAlchemy table for it, if it exists.
125         """
126         if self._classtables is None:
127             res = await self.execute(sa.text("""SELECT tablename FROM pg_tables
128                                                 WHERE tablename LIKE 'place_classtype_%'
129                                              """))
130             self._classtables = {r[0] for r in res}
131
132         tablename = f"place_classtype_{cls}_{typ}"
133
134         if tablename not in self._classtables:
135             return None
136
137         if tablename in self.t.meta.tables:
138             return self.t.meta.tables[tablename]
139
140         return sa.Table(tablename, self.t.meta,
141                         sa.Column('place_id', sa.BigInteger),
142                         sa.Column('centroid', Geometry))