]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/core.py
Merge pull request #3110 from lonvia/sql-lambda-queries
[nominatim.git] / nominatim / api / core.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of classes for API access via libraries.
9 """
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
11 import asyncio
12 import contextlib
13 from pathlib import Path
14
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
17
18 from nominatim.errors import UsageError
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
21 from nominatim.config import Configuration
22 from nominatim.api.connection import SearchConnection
23 from nominatim.api.status import get_status, StatusResult
24 from nominatim.api.lookup import get_detailed_place, get_simple_place
25 from nominatim.api.reverse import ReverseGeocoder
26 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
27 import nominatim.api.types as ntyp
28 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
29
30
31 class NominatimAPIAsync:
32     """ API loader asynchornous version.
33     """
34     def __init__(self, project_dir: Path,
35                  environ: Optional[Mapping[str, str]] = None) -> None:
36         self.config = Configuration(project_dir, environ)
37         self.server_version = 0
38
39         self._engine_lock = asyncio.Lock()
40         self._engine: Optional[sa_asyncio.AsyncEngine] = None
41         self._tables: Optional[SearchTables] = None
42         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
43
44
45     async def setup_database(self) -> None:
46         """ Set up the engine and connection parameters.
47
48             This function will be implicitly called when the database is
49             accessed for the first time. You may also call it explicitly to
50             avoid that the first call is delayed by the setup.
51         """
52         async with self._engine_lock:
53             if self._engine:
54                 return
55
56             dsn = self.config.get_database_params()
57             pool_size = self.config.get_int('API_POOL_SIZE')
58
59             query = {k: v for k, v in dsn.items()
60                       if k not in ('user', 'password', 'dbname', 'host', 'port')}
61
62             dburl = sa.engine.URL.create(
63                        f'postgresql+{PGCORE_LIB}',
64                        database=dsn.get('dbname'),
65                        username=dsn.get('user'), password=dsn.get('password'),
66                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
67                        query=query)
68             engine = sa_asyncio.create_async_engine(dburl, future=True,
69                                                     max_overflow=0, pool_size=pool_size,
70                                                     echo=self.config.get_bool('DEBUG_SQL'))
71
72             try:
73                 async with engine.begin() as conn:
74                     result = await conn.scalar(sa.text('SHOW server_version_num'))
75                     server_version = int(result)
76             except (PGCORE_ERROR, sa.exc.OperationalError):
77                 server_version = 0
78
79             if server_version >= 110000:
80                 @sa.event.listens_for(engine.sync_engine, "connect")
81                 def _on_connect(dbapi_con: Any, _: Any) -> None:
82                     cursor = dbapi_con.cursor()
83                     cursor.execute("SET jit_above_cost TO '-1'")
84                     cursor.execute("SET max_parallel_workers_per_gather TO '0'")
85                 # Make sure that all connections get the new settings
86                 await self.close()
87
88             self._property_cache['DB:server_version'] = server_version
89
90             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
91             self._engine = engine
92
93
94     async def close(self) -> None:
95         """ Close all active connections to the database. The NominatimAPIAsync
96             object remains usable after closing. If a new API functions is
97             called, new connections are created.
98         """
99         if self._engine is not None:
100             await self._engine.dispose()
101
102
103     @contextlib.asynccontextmanager
104     async def begin(self) -> AsyncIterator[SearchConnection]:
105         """ Create a new connection with automatic transaction handling.
106
107             This function may be used to get low-level access to the database.
108             Refer to the documentation of SQLAlchemy for details how to use
109             the connection object.
110         """
111         if self._engine is None:
112             await self.setup_database()
113
114         assert self._engine is not None
115         assert self._tables is not None
116
117         async with self._engine.begin() as conn:
118             yield SearchConnection(conn, self._tables, self._property_cache)
119
120
121     async def status(self) -> StatusResult:
122         """ Return the status of the database.
123         """
124         try:
125             async with self.begin() as conn:
126                 status = await get_status(conn)
127         except (PGCORE_ERROR, sa.exc.OperationalError):
128             return StatusResult(700, 'Database connection failed')
129
130         return status
131
132
133     async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
134         """ Get detailed information about a place in the database.
135
136             Returns None if there is no entry under the given ID.
137         """
138         details = ntyp.LookupDetails.from_kwargs(params)
139         async with self.begin() as conn:
140             if details.keywords:
141                 await make_query_analyzer(conn)
142             return await get_detailed_place(conn, place, details)
143
144
145     async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
146         """ Get simple information about a list of places.
147
148             Returns a list of place information for all IDs that were found.
149         """
150         details = ntyp.LookupDetails.from_kwargs(params)
151         async with self.begin() as conn:
152             if details.keywords:
153                 await make_query_analyzer(conn)
154             return SearchResults(filter(None,
155                                         [await get_simple_place(conn, p, details) for p in places]))
156
157
158     async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
159         """ Find a place by its coordinates. Also known as reverse geocoding.
160
161             Returns the closest result that can be found or None if
162             no place matches the given criteria.
163         """
164         # The following negation handles NaN correctly. Don't change.
165         if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
166             # There are no results to be expected outside valid coordinates.
167             return None
168
169         details = ntyp.ReverseDetails.from_kwargs(params)
170         async with self.begin() as conn:
171             if details.keywords:
172                 await make_query_analyzer(conn)
173             geocoder = ReverseGeocoder(conn, details)
174             return await geocoder.lookup(coord)
175
176
177     async def search(self, query: str, **params: Any) -> SearchResults:
178         """ Find a place by free-text search. Also known as forward geocoding.
179         """
180         query = query.strip()
181         if not query:
182             raise UsageError('Nothing to search for.')
183
184         async with self.begin() as conn:
185             geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
186             phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
187             return await geocoder.lookup(phrases)
188
189
190     # pylint: disable=too-many-arguments,too-many-branches
191     async def search_address(self, amenity: Optional[str] = None,
192                              street: Optional[str] = None,
193                              city: Optional[str] = None,
194                              county: Optional[str] = None,
195                              state: Optional[str] = None,
196                              country: Optional[str] = None,
197                              postalcode: Optional[str] = None,
198                              **params: Any) -> SearchResults:
199         """ Find an address using structured search.
200         """
201         async with self.begin() as conn:
202             details = ntyp.SearchDetails.from_kwargs(params)
203
204             phrases: List[Phrase] = []
205
206             if amenity:
207                 phrases.append(Phrase(PhraseType.AMENITY, amenity))
208             if street:
209                 phrases.append(Phrase(PhraseType.STREET, street))
210             if city:
211                 phrases.append(Phrase(PhraseType.CITY, city))
212             if county:
213                 phrases.append(Phrase(PhraseType.COUNTY, county))
214             if state:
215                 phrases.append(Phrase(PhraseType.STATE, state))
216             if postalcode:
217                 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
218             if country:
219                 phrases.append(Phrase(PhraseType.COUNTRY, country))
220
221             if not phrases:
222                 raise UsageError('Nothing to search for.')
223
224             if amenity or street:
225                 details.restrict_min_max_rank(26, 30)
226             elif city:
227                 details.restrict_min_max_rank(13, 25)
228             elif county:
229                 details.restrict_min_max_rank(10, 12)
230             elif state:
231                 details.restrict_min_max_rank(5, 9)
232             elif postalcode:
233                 details.restrict_min_max_rank(5, 11)
234             else:
235                 details.restrict_min_max_rank(4, 4)
236
237             if 'layers' not in params:
238                 details.layers = ntyp.DataLayer.ADDRESS
239                 if amenity:
240                     details.layers |= ntyp.DataLayer.POI
241
242             geocoder = ForwardGeocoder(conn, details)
243             return await geocoder.lookup(phrases)
244
245
246     async def search_category(self, categories: List[Tuple[str, str]],
247                               near_query: Optional[str] = None,
248                               **params: Any) -> SearchResults:
249         """ Find an object of a certain category near another place.
250             The near place may either be given as an unstructured search
251             query in itself or as coordinates.
252         """
253         if not categories:
254             return SearchResults()
255
256         details = ntyp.SearchDetails.from_kwargs(params)
257         async with self.begin() as conn:
258             if near_query:
259                 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
260             else:
261                 phrases = []
262                 if details.keywords:
263                     await make_query_analyzer(conn)
264
265             geocoder = ForwardGeocoder(conn, details)
266             return await geocoder.lookup_pois(categories, phrases)
267
268
269
270 class NominatimAPI:
271     """ API loader, synchronous version.
272     """
273
274     def __init__(self, project_dir: Path,
275                  environ: Optional[Mapping[str, str]] = None) -> None:
276         self._loop = asyncio.new_event_loop()
277         self._async_api = NominatimAPIAsync(project_dir, environ)
278
279
280     def close(self) -> None:
281         """ Close all active connections to the database. The NominatimAPIAsync
282             object remains usable after closing. If a new API functions is
283             called, new connections are created.
284         """
285         self._loop.run_until_complete(self._async_api.close())
286         self._loop.close()
287
288
289     @property
290     def config(self) -> Configuration:
291         """ Return the configuration used by the API.
292         """
293         return self._async_api.config
294
295     def status(self) -> StatusResult:
296         """ Return the status of the database.
297         """
298         return self._loop.run_until_complete(self._async_api.status())
299
300
301     def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
302         """ Get detailed information about a place in the database.
303         """
304         return self._loop.run_until_complete(self._async_api.details(place, **params))
305
306
307     def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
308         """ Get simple information about a list of places.
309
310             Returns a list of place information for all IDs that were found.
311         """
312         return self._loop.run_until_complete(self._async_api.lookup(places, **params))
313
314
315     def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
316         """ Find a place by its coordinates. Also known as reverse geocoding.
317
318             Returns the closest result that can be found or None if
319             no place matches the given criteria.
320         """
321         return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
322
323
324     def search(self, query: str, **params: Any) -> SearchResults:
325         """ Find a place by free-text search. Also known as forward geocoding.
326         """
327         return self._loop.run_until_complete(
328                    self._async_api.search(query, **params))
329
330
331     # pylint: disable=too-many-arguments
332     def search_address(self, amenity: Optional[str] = None,
333                        street: Optional[str] = None,
334                        city: Optional[str] = None,
335                        county: Optional[str] = None,
336                        state: Optional[str] = None,
337                        country: Optional[str] = None,
338                        postalcode: Optional[str] = None,
339                        **params: Any) -> SearchResults:
340         """ Find an address using structured search.
341         """
342         return self._loop.run_until_complete(
343                    self._async_api.search_address(amenity, street, city, county,
344                                                   state, country, postalcode, **params))
345
346
347     def search_category(self, categories: List[Tuple[str, str]],
348                         near_query: Optional[str] = None,
349                         **params: Any) -> SearchResults:
350         """ Find an object of a certain category near another place.
351             The near place may either be given as an unstructured search
352             query in itself or as a geographic area through the
353             viewbox or near parameters.
354         """
355         return self._loop.run_until_complete(
356                    self._async_api.search_category(categories, near_query, **params))