]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/core.py
add a timeout for DB queries
[nominatim.git] / nominatim / api / core.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of classes for API access via libraries.
9 """
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
11 import asyncio
12 import sys
13 import contextlib
14 from pathlib import Path
15
16 import sqlalchemy as sa
17 import sqlalchemy.ext.asyncio as sa_asyncio
18
19 from nominatim.errors import UsageError
20 from nominatim.db.sqlalchemy_schema import SearchTables
21 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
22 from nominatim.config import Configuration
23 from nominatim.api.connection import SearchConnection
24 from nominatim.api.status import get_status, StatusResult
25 from nominatim.api.lookup import get_detailed_place, get_simple_place
26 from nominatim.api.reverse import ReverseGeocoder
27 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
28 import nominatim.api.types as ntyp
29 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
30
31
32 class NominatimAPIAsync:
33     """ API loader asynchornous version.
34     """
35     def __init__(self, project_dir: Path,
36                  environ: Optional[Mapping[str, str]] = None,
37                  loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
38         self.config = Configuration(project_dir, environ)
39         self.query_timeout = self.config.get_int('QUERY_TIMEOUT') \
40                              if self.config.QUERY_TIMEOUT else None
41         self.server_version = 0
42
43         if sys.version_info >= (3, 10):
44             self._engine_lock = asyncio.Lock()
45         else:
46             self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
47         self._engine: Optional[sa_asyncio.AsyncEngine] = None
48         self._tables: Optional[SearchTables] = None
49         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
50
51
52     async def setup_database(self) -> None:
53         """ Set up the engine and connection parameters.
54
55             This function will be implicitly called when the database is
56             accessed for the first time. You may also call it explicitly to
57             avoid that the first call is delayed by the setup.
58         """
59         async with self._engine_lock:
60             if self._engine:
61                 return
62
63             dsn = self.config.get_database_params()
64             pool_size = self.config.get_int('API_POOL_SIZE')
65
66             query = {k: v for k, v in dsn.items()
67                       if k not in ('user', 'password', 'dbname', 'host', 'port')}
68
69             dburl = sa.engine.URL.create(
70                        f'postgresql+{PGCORE_LIB}',
71                        database=dsn.get('dbname'),
72                        username=dsn.get('user'), password=dsn.get('password'),
73                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
74                        query=query)
75             engine = sa_asyncio.create_async_engine(dburl, future=True,
76                                                     max_overflow=0, pool_size=pool_size,
77                                                     echo=self.config.get_bool('DEBUG_SQL'))
78
79             try:
80                 async with engine.begin() as conn:
81                     result = await conn.scalar(sa.text('SHOW server_version_num'))
82                     server_version = int(result)
83             except (PGCORE_ERROR, sa.exc.OperationalError):
84                 server_version = 0
85
86             if server_version >= 110000:
87                 @sa.event.listens_for(engine.sync_engine, "connect")
88                 def _on_connect(dbapi_con: Any, _: Any) -> None:
89                     cursor = dbapi_con.cursor()
90                     cursor.execute("SET jit_above_cost TO '-1'")
91                     cursor.execute("SET max_parallel_workers_per_gather TO '0'")
92                 # Make sure that all connections get the new settings
93                 await self.close()
94
95             self._property_cache['DB:server_version'] = server_version
96
97             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
98             self._engine = engine
99
100
101     async def close(self) -> None:
102         """ Close all active connections to the database. The NominatimAPIAsync
103             object remains usable after closing. If a new API functions is
104             called, new connections are created.
105         """
106         if self._engine is not None:
107             await self._engine.dispose()
108
109
110     @contextlib.asynccontextmanager
111     async def begin(self) -> AsyncIterator[SearchConnection]:
112         """ Create a new connection with automatic transaction handling.
113
114             This function may be used to get low-level access to the database.
115             Refer to the documentation of SQLAlchemy for details how to use
116             the connection object.
117         """
118         if self._engine is None:
119             await self.setup_database()
120
121         assert self._engine is not None
122         assert self._tables is not None
123
124         async with self._engine.begin() as conn:
125             yield SearchConnection(conn, self._tables, self._property_cache)
126
127
128     async def status(self) -> StatusResult:
129         """ Return the status of the database.
130         """
131         try:
132             async with self.begin() as conn:
133                 conn.set_query_timeout(self.query_timeout)
134                 status = await get_status(conn)
135         except (PGCORE_ERROR, sa.exc.OperationalError):
136             return StatusResult(700, 'Database connection failed')
137
138         return status
139
140
141     async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
142         """ Get detailed information about a place in the database.
143
144             Returns None if there is no entry under the given ID.
145         """
146         details = ntyp.LookupDetails.from_kwargs(params)
147         async with self.begin() as conn:
148             conn.set_query_timeout(self.query_timeout)
149             if details.keywords:
150                 await make_query_analyzer(conn)
151             return await get_detailed_place(conn, place, details)
152
153
154     async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
155         """ Get simple information about a list of places.
156
157             Returns a list of place information for all IDs that were found.
158         """
159         details = ntyp.LookupDetails.from_kwargs(params)
160         async with self.begin() as conn:
161             conn.set_query_timeout(self.query_timeout)
162             if details.keywords:
163                 await make_query_analyzer(conn)
164             return SearchResults(filter(None,
165                                         [await get_simple_place(conn, p, details) for p in places]))
166
167
168     async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
169         """ Find a place by its coordinates. Also known as reverse geocoding.
170
171             Returns the closest result that can be found or None if
172             no place matches the given criteria.
173         """
174         # The following negation handles NaN correctly. Don't change.
175         if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
176             # There are no results to be expected outside valid coordinates.
177             return None
178
179         details = ntyp.ReverseDetails.from_kwargs(params)
180         async with self.begin() as conn:
181             conn.set_query_timeout(self.query_timeout)
182             if details.keywords:
183                 await make_query_analyzer(conn)
184             geocoder = ReverseGeocoder(conn, details)
185             return await geocoder.lookup(coord)
186
187
188     async def search(self, query: str, **params: Any) -> SearchResults:
189         """ Find a place by free-text search. Also known as forward geocoding.
190         """
191         query = query.strip()
192         if not query:
193             raise UsageError('Nothing to search for.')
194
195         async with self.begin() as conn:
196             conn.set_query_timeout(self.query_timeout)
197             geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
198             phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
199             return await geocoder.lookup(phrases)
200
201
202     # pylint: disable=too-many-arguments,too-many-branches
203     async def search_address(self, amenity: Optional[str] = None,
204                              street: Optional[str] = None,
205                              city: Optional[str] = None,
206                              county: Optional[str] = None,
207                              state: Optional[str] = None,
208                              country: Optional[str] = None,
209                              postalcode: Optional[str] = None,
210                              **params: Any) -> SearchResults:
211         """ Find an address using structured search.
212         """
213         async with self.begin() as conn:
214             conn.set_query_timeout(self.query_timeout)
215             details = ntyp.SearchDetails.from_kwargs(params)
216
217             phrases: List[Phrase] = []
218
219             if amenity:
220                 phrases.append(Phrase(PhraseType.AMENITY, amenity))
221             if street:
222                 phrases.append(Phrase(PhraseType.STREET, street))
223             if city:
224                 phrases.append(Phrase(PhraseType.CITY, city))
225             if county:
226                 phrases.append(Phrase(PhraseType.COUNTY, county))
227             if state:
228                 phrases.append(Phrase(PhraseType.STATE, state))
229             if postalcode:
230                 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
231             if country:
232                 phrases.append(Phrase(PhraseType.COUNTRY, country))
233
234             if not phrases:
235                 raise UsageError('Nothing to search for.')
236
237             if amenity or street:
238                 details.restrict_min_max_rank(26, 30)
239             elif city:
240                 details.restrict_min_max_rank(13, 25)
241             elif county:
242                 details.restrict_min_max_rank(10, 12)
243             elif state:
244                 details.restrict_min_max_rank(5, 9)
245             elif postalcode:
246                 details.restrict_min_max_rank(5, 11)
247             else:
248                 details.restrict_min_max_rank(4, 4)
249
250             if 'layers' not in params:
251                 details.layers = ntyp.DataLayer.ADDRESS
252                 if amenity:
253                     details.layers |= ntyp.DataLayer.POI
254
255             geocoder = ForwardGeocoder(conn, details)
256             return await geocoder.lookup(phrases)
257
258
259     async def search_category(self, categories: List[Tuple[str, str]],
260                               near_query: Optional[str] = None,
261                               **params: Any) -> SearchResults:
262         """ Find an object of a certain category near another place.
263             The near place may either be given as an unstructured search
264             query in itself or as coordinates.
265         """
266         if not categories:
267             return SearchResults()
268
269         details = ntyp.SearchDetails.from_kwargs(params)
270         async with self.begin() as conn:
271             conn.set_query_timeout(self.query_timeout)
272             if near_query:
273                 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
274             else:
275                 phrases = []
276                 if details.keywords:
277                     await make_query_analyzer(conn)
278
279             geocoder = ForwardGeocoder(conn, details)
280             return await geocoder.lookup_pois(categories, phrases)
281
282
283
284 class NominatimAPI:
285     """ API loader, synchronous version.
286     """
287
288     def __init__(self, project_dir: Path,
289                  environ: Optional[Mapping[str, str]] = None) -> None:
290         self._loop = asyncio.new_event_loop()
291         self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
292
293
294     def close(self) -> None:
295         """ Close all active connections to the database. The NominatimAPIAsync
296             object remains usable after closing. If a new API functions is
297             called, new connections are created.
298         """
299         self._loop.run_until_complete(self._async_api.close())
300         self._loop.close()
301
302
303     @property
304     def config(self) -> Configuration:
305         """ Return the configuration used by the API.
306         """
307         return self._async_api.config
308
309     def status(self) -> StatusResult:
310         """ Return the status of the database.
311         """
312         return self._loop.run_until_complete(self._async_api.status())
313
314
315     def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
316         """ Get detailed information about a place in the database.
317         """
318         return self._loop.run_until_complete(self._async_api.details(place, **params))
319
320
321     def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
322         """ Get simple information about a list of places.
323
324             Returns a list of place information for all IDs that were found.
325         """
326         return self._loop.run_until_complete(self._async_api.lookup(places, **params))
327
328
329     def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
330         """ Find a place by its coordinates. Also known as reverse geocoding.
331
332             Returns the closest result that can be found or None if
333             no place matches the given criteria.
334         """
335         return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
336
337
338     def search(self, query: str, **params: Any) -> SearchResults:
339         """ Find a place by free-text search. Also known as forward geocoding.
340         """
341         return self._loop.run_until_complete(
342                    self._async_api.search(query, **params))
343
344
345     # pylint: disable=too-many-arguments
346     def search_address(self, amenity: Optional[str] = None,
347                        street: Optional[str] = None,
348                        city: Optional[str] = None,
349                        county: Optional[str] = None,
350                        state: Optional[str] = None,
351                        country: Optional[str] = None,
352                        postalcode: Optional[str] = None,
353                        **params: Any) -> SearchResults:
354         """ Find an address using structured search.
355         """
356         return self._loop.run_until_complete(
357                    self._async_api.search_address(amenity, street, city, county,
358                                                   state, country, postalcode, **params))
359
360
361     def search_category(self, categories: List[Tuple[str, str]],
362                         near_query: Optional[str] = None,
363                         **params: Any) -> SearchResults:
364         """ Find an object of a certain category near another place.
365             The near place may either be given as an unstructured search
366             query in itself or as a geographic area through the
367             viewbox or near parameters.
368         """
369         return self._loop.run_until_complete(
370                    self._async_api.search_category(categories, near_query, **params))