]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/core.py
add additional timeout for entire request
[nominatim.git] / nominatim / api / core.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of classes for API access via libraries.
9 """
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
11 import asyncio
12 import sys
13 import contextlib
14 from pathlib import Path
15
16 import sqlalchemy as sa
17 import sqlalchemy.ext.asyncio as sa_asyncio
18
19 from nominatim.errors import UsageError
20 from nominatim.db.sqlalchemy_schema import SearchTables
21 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
22 from nominatim.config import Configuration
23 from nominatim.api.connection import SearchConnection
24 from nominatim.api.status import get_status, StatusResult
25 from nominatim.api.lookup import get_detailed_place, get_simple_place
26 from nominatim.api.reverse import ReverseGeocoder
27 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
28 import nominatim.api.types as ntyp
29 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
30
31
32 class NominatimAPIAsync:
33     """ API loader asynchornous version.
34     """
35     def __init__(self, project_dir: Path,
36                  environ: Optional[Mapping[str, str]] = None,
37                  loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
38         self.config = Configuration(project_dir, environ)
39         self.query_timeout = self.config.get_int('QUERY_TIMEOUT') \
40                              if self.config.QUERY_TIMEOUT else None
41         self.server_version = 0
42
43         if sys.version_info >= (3, 10):
44             self._engine_lock = asyncio.Lock()
45         else:
46             self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
47         self._engine: Optional[sa_asyncio.AsyncEngine] = None
48         self._tables: Optional[SearchTables] = None
49         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
50
51
52     async def setup_database(self) -> None:
53         """ Set up the engine and connection parameters.
54
55             This function will be implicitly called when the database is
56             accessed for the first time. You may also call it explicitly to
57             avoid that the first call is delayed by the setup.
58         """
59         async with self._engine_lock:
60             if self._engine:
61                 return
62
63             dsn = self.config.get_database_params()
64             pool_size = self.config.get_int('API_POOL_SIZE')
65
66             query = {k: v for k, v in dsn.items()
67                       if k not in ('user', 'password', 'dbname', 'host', 'port')}
68
69             dburl = sa.engine.URL.create(
70                        f'postgresql+{PGCORE_LIB}',
71                        database=dsn.get('dbname'),
72                        username=dsn.get('user'), password=dsn.get('password'),
73                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
74                        query=query)
75             engine = sa_asyncio.create_async_engine(dburl, future=True,
76                                                     max_overflow=0, pool_size=pool_size,
77                                                     echo=self.config.get_bool('DEBUG_SQL'))
78
79             try:
80                 async with engine.begin() as conn:
81                     result = await conn.scalar(sa.text('SHOW server_version_num'))
82                     server_version = int(result)
83             except (PGCORE_ERROR, sa.exc.OperationalError):
84                 server_version = 0
85
86             if server_version >= 110000:
87                 @sa.event.listens_for(engine.sync_engine, "connect")
88                 def _on_connect(dbapi_con: Any, _: Any) -> None:
89                     cursor = dbapi_con.cursor()
90                     cursor.execute("SET jit_above_cost TO '-1'")
91                     cursor.execute("SET max_parallel_workers_per_gather TO '0'")
92                 # Make sure that all connections get the new settings
93                 await self.close()
94
95             self._property_cache['DB:server_version'] = server_version
96
97             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
98             self._engine = engine
99
100
101     async def close(self) -> None:
102         """ Close all active connections to the database. The NominatimAPIAsync
103             object remains usable after closing. If a new API functions is
104             called, new connections are created.
105         """
106         if self._engine is not None:
107             await self._engine.dispose()
108
109
110     @contextlib.asynccontextmanager
111     async def begin(self) -> AsyncIterator[SearchConnection]:
112         """ Create a new connection with automatic transaction handling.
113
114             This function may be used to get low-level access to the database.
115             Refer to the documentation of SQLAlchemy for details how to use
116             the connection object.
117         """
118         if self._engine is None:
119             await self.setup_database()
120
121         assert self._engine is not None
122         assert self._tables is not None
123
124         async with self._engine.begin() as conn:
125             yield SearchConnection(conn, self._tables, self._property_cache)
126
127
128     async def status(self) -> StatusResult:
129         """ Return the status of the database.
130         """
131         try:
132             async with self.begin() as conn:
133                 conn.set_query_timeout(self.query_timeout)
134                 status = await get_status(conn)
135         except (PGCORE_ERROR, sa.exc.OperationalError):
136             return StatusResult(700, 'Database connection failed')
137
138         return status
139
140
141     async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
142         """ Get detailed information about a place in the database.
143
144             Returns None if there is no entry under the given ID.
145         """
146         details = ntyp.LookupDetails.from_kwargs(params)
147         async with self.begin() as conn:
148             conn.set_query_timeout(self.query_timeout)
149             if details.keywords:
150                 await make_query_analyzer(conn)
151             return await get_detailed_place(conn, place, details)
152
153
154     async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
155         """ Get simple information about a list of places.
156
157             Returns a list of place information for all IDs that were found.
158         """
159         details = ntyp.LookupDetails.from_kwargs(params)
160         async with self.begin() as conn:
161             conn.set_query_timeout(self.query_timeout)
162             if details.keywords:
163                 await make_query_analyzer(conn)
164             return SearchResults(filter(None,
165                                         [await get_simple_place(conn, p, details) for p in places]))
166
167
168     async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
169         """ Find a place by its coordinates. Also known as reverse geocoding.
170
171             Returns the closest result that can be found or None if
172             no place matches the given criteria.
173         """
174         # The following negation handles NaN correctly. Don't change.
175         if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
176             # There are no results to be expected outside valid coordinates.
177             return None
178
179         details = ntyp.ReverseDetails.from_kwargs(params)
180         async with self.begin() as conn:
181             conn.set_query_timeout(self.query_timeout)
182             if details.keywords:
183                 await make_query_analyzer(conn)
184             geocoder = ReverseGeocoder(conn, details)
185             return await geocoder.lookup(coord)
186
187
188     async def search(self, query: str, **params: Any) -> SearchResults:
189         """ Find a place by free-text search. Also known as forward geocoding.
190         """
191         query = query.strip()
192         if not query:
193             raise UsageError('Nothing to search for.')
194
195         async with self.begin() as conn:
196             conn.set_query_timeout(self.query_timeout)
197             geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
198                                        self.config.get_int('REQUEST_TIMEOUT') \
199                                          if self.config.REQUEST_TIMEOUT else None)
200             phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
201             return await geocoder.lookup(phrases)
202
203
204     # pylint: disable=too-many-arguments,too-many-branches
205     async def search_address(self, amenity: Optional[str] = None,
206                              street: Optional[str] = None,
207                              city: Optional[str] = None,
208                              county: Optional[str] = None,
209                              state: Optional[str] = None,
210                              country: Optional[str] = None,
211                              postalcode: Optional[str] = None,
212                              **params: Any) -> SearchResults:
213         """ Find an address using structured search.
214         """
215         async with self.begin() as conn:
216             conn.set_query_timeout(self.query_timeout)
217             details = ntyp.SearchDetails.from_kwargs(params)
218
219             phrases: List[Phrase] = []
220
221             if amenity:
222                 phrases.append(Phrase(PhraseType.AMENITY, amenity))
223             if street:
224                 phrases.append(Phrase(PhraseType.STREET, street))
225             if city:
226                 phrases.append(Phrase(PhraseType.CITY, city))
227             if county:
228                 phrases.append(Phrase(PhraseType.COUNTY, county))
229             if state:
230                 phrases.append(Phrase(PhraseType.STATE, state))
231             if postalcode:
232                 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
233             if country:
234                 phrases.append(Phrase(PhraseType.COUNTRY, country))
235
236             if not phrases:
237                 raise UsageError('Nothing to search for.')
238
239             if amenity or street:
240                 details.restrict_min_max_rank(26, 30)
241             elif city:
242                 details.restrict_min_max_rank(13, 25)
243             elif county:
244                 details.restrict_min_max_rank(10, 12)
245             elif state:
246                 details.restrict_min_max_rank(5, 9)
247             elif postalcode:
248                 details.restrict_min_max_rank(5, 11)
249             else:
250                 details.restrict_min_max_rank(4, 4)
251
252             if 'layers' not in params:
253                 details.layers = ntyp.DataLayer.ADDRESS
254                 if amenity:
255                     details.layers |= ntyp.DataLayer.POI
256
257             geocoder = ForwardGeocoder(conn, details,
258                                        self.config.get_int('REQUEST_TIMEOUT') \
259                                          if self.config.REQUEST_TIMEOUT else None)
260             return await geocoder.lookup(phrases)
261
262
263     async def search_category(self, categories: List[Tuple[str, str]],
264                               near_query: Optional[str] = None,
265                               **params: Any) -> SearchResults:
266         """ Find an object of a certain category near another place.
267             The near place may either be given as an unstructured search
268             query in itself or as coordinates.
269         """
270         if not categories:
271             return SearchResults()
272
273         details = ntyp.SearchDetails.from_kwargs(params)
274         async with self.begin() as conn:
275             conn.set_query_timeout(self.query_timeout)
276             if near_query:
277                 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
278             else:
279                 phrases = []
280                 if details.keywords:
281                     await make_query_analyzer(conn)
282
283             geocoder = ForwardGeocoder(conn, details,
284                                        self.config.get_int('REQUEST_TIMEOUT') \
285                                          if self.config.REQUEST_TIMEOUT else None)
286             return await geocoder.lookup_pois(categories, phrases)
287
288
289
290 class NominatimAPI:
291     """ API loader, synchronous version.
292     """
293
294     def __init__(self, project_dir: Path,
295                  environ: Optional[Mapping[str, str]] = None) -> None:
296         self._loop = asyncio.new_event_loop()
297         self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
298
299
300     def close(self) -> None:
301         """ Close all active connections to the database. The NominatimAPIAsync
302             object remains usable after closing. If a new API functions is
303             called, new connections are created.
304         """
305         self._loop.run_until_complete(self._async_api.close())
306         self._loop.close()
307
308
309     @property
310     def config(self) -> Configuration:
311         """ Return the configuration used by the API.
312         """
313         return self._async_api.config
314
315     def status(self) -> StatusResult:
316         """ Return the status of the database.
317         """
318         return self._loop.run_until_complete(self._async_api.status())
319
320
321     def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
322         """ Get detailed information about a place in the database.
323         """
324         return self._loop.run_until_complete(self._async_api.details(place, **params))
325
326
327     def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
328         """ Get simple information about a list of places.
329
330             Returns a list of place information for all IDs that were found.
331         """
332         return self._loop.run_until_complete(self._async_api.lookup(places, **params))
333
334
335     def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
336         """ Find a place by its coordinates. Also known as reverse geocoding.
337
338             Returns the closest result that can be found or None if
339             no place matches the given criteria.
340         """
341         return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
342
343
344     def search(self, query: str, **params: Any) -> SearchResults:
345         """ Find a place by free-text search. Also known as forward geocoding.
346         """
347         return self._loop.run_until_complete(
348                    self._async_api.search(query, **params))
349
350
351     # pylint: disable=too-many-arguments
352     def search_address(self, amenity: Optional[str] = None,
353                        street: Optional[str] = None,
354                        city: Optional[str] = None,
355                        county: Optional[str] = None,
356                        state: Optional[str] = None,
357                        country: Optional[str] = None,
358                        postalcode: Optional[str] = None,
359                        **params: Any) -> SearchResults:
360         """ Find an address using structured search.
361         """
362         return self._loop.run_until_complete(
363                    self._async_api.search_address(amenity, street, city, county,
364                                                   state, country, postalcode, **params))
365
366
367     def search_category(self, categories: List[Tuple[str, str]],
368                         near_query: Optional[str] = None,
369                         **params: Any) -> SearchResults:
370         """ Find an object of a certain category near another place.
371             The near place may either be given as an unstructured search
372             query in itself or as a geographic area through the
373             viewbox or near parameters.
374         """
375         return self._loop.run_until_complete(
376                    self._async_api.search_category(categories, near_query, **params))