]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/core.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / nominatim / api / core.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of classes for API access via libraries.
9 """
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
11 import asyncio
12 import contextlib
13 from pathlib import Path
14
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
17
18 from nominatim.errors import UsageError
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
21 from nominatim.config import Configuration
22 from nominatim.api.connection import SearchConnection
23 from nominatim.api.status import get_status, StatusResult
24 from nominatim.api.lookup import get_detailed_place, get_simple_place
25 from nominatim.api.reverse import ReverseGeocoder
26 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
27 import nominatim.api.types as ntyp
28 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
29
30
31 class NominatimAPIAsync:
32     """ API loader asynchornous version.
33     """
34     def __init__(self, project_dir: Path,
35                  environ: Optional[Mapping[str, str]] = None) -> None:
36         self.config = Configuration(project_dir, environ)
37         self.server_version = 0
38
39         self._engine_lock = asyncio.Lock()
40         self._engine: Optional[sa_asyncio.AsyncEngine] = None
41         self._tables: Optional[SearchTables] = None
42         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
43
44
45     async def setup_database(self) -> None:
46         """ Set up the engine and connection parameters.
47
48             This function will be implicitly called when the database is
49             accessed for the first time. You may also call it explicitly to
50             avoid that the first call is delayed by the setup.
51         """
52         async with self._engine_lock:
53             if self._engine:
54                 return
55
56             dsn = self.config.get_database_params()
57
58             query = {k: v for k, v in dsn.items()
59                       if k not in ('user', 'password', 'dbname', 'host', 'port')}
60             if PGCORE_LIB == 'asyncpg':
61                 query['prepared_statement_cache_size'] = '0'
62
63             dburl = sa.engine.URL.create(
64                        f'postgresql+{PGCORE_LIB}',
65                        database=dsn.get('dbname'),
66                        username=dsn.get('user'), password=dsn.get('password'),
67                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
68                        query=query)
69             engine = sa_asyncio.create_async_engine(dburl, future=True)
70
71             try:
72                 async with engine.begin() as conn:
73                     result = await conn.scalar(sa.text('SHOW server_version_num'))
74                     server_version = int(result)
75             except (PGCORE_ERROR, sa.exc.OperationalError):
76                 server_version = 0
77
78             if server_version >= 110000:
79                 @sa.event.listens_for(engine.sync_engine, "connect")
80                 def _on_connect(dbapi_con: Any, _: Any) -> None:
81                     cursor = dbapi_con.cursor()
82                     cursor.execute("SET jit_above_cost TO '-1'")
83                     cursor.execute("SET max_parallel_workers_per_gather TO '0'")
84                 # Make sure that all connections get the new settings
85                 await self.close()
86
87             self._property_cache['DB:server_version'] = server_version
88
89             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
90             self._engine = engine
91
92
93     async def close(self) -> None:
94         """ Close all active connections to the database. The NominatimAPIAsync
95             object remains usable after closing. If a new API functions is
96             called, new connections are created.
97         """
98         if self._engine is not None:
99             await self._engine.dispose()
100
101
102     @contextlib.asynccontextmanager
103     async def begin(self) -> AsyncIterator[SearchConnection]:
104         """ Create a new connection with automatic transaction handling.
105
106             This function may be used to get low-level access to the database.
107             Refer to the documentation of SQLAlchemy for details how to use
108             the connection object.
109         """
110         if self._engine is None:
111             await self.setup_database()
112
113         assert self._engine is not None
114         assert self._tables is not None
115
116         async with self._engine.begin() as conn:
117             yield SearchConnection(conn, self._tables, self._property_cache)
118
119
120     async def status(self) -> StatusResult:
121         """ Return the status of the database.
122         """
123         try:
124             async with self.begin() as conn:
125                 status = await get_status(conn)
126         except (PGCORE_ERROR, sa.exc.OperationalError):
127             return StatusResult(700, 'Database connection failed')
128
129         return status
130
131
132     async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
133         """ Get detailed information about a place in the database.
134
135             Returns None if there is no entry under the given ID.
136         """
137         details = ntyp.LookupDetails.from_kwargs(params)
138         async with self.begin() as conn:
139             if details.keywords:
140                 await make_query_analyzer(conn)
141             return await get_detailed_place(conn, place, details)
142
143
144     async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
145         """ Get simple information about a list of places.
146
147             Returns a list of place information for all IDs that were found.
148         """
149         details = ntyp.LookupDetails.from_kwargs(params)
150         async with self.begin() as conn:
151             if details.keywords:
152                 await make_query_analyzer(conn)
153             return SearchResults(filter(None,
154                                         [await get_simple_place(conn, p, details) for p in places]))
155
156
157     async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
158         """ Find a place by its coordinates. Also known as reverse geocoding.
159
160             Returns the closest result that can be found or None if
161             no place matches the given criteria.
162         """
163         # The following negation handles NaN correctly. Don't change.
164         if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
165             # There are no results to be expected outside valid coordinates.
166             return None
167
168         details = ntyp.ReverseDetails.from_kwargs(params)
169         async with self.begin() as conn:
170             if details.keywords:
171                 await make_query_analyzer(conn)
172             geocoder = ReverseGeocoder(conn, details)
173             return await geocoder.lookup(coord)
174
175
176     async def search(self, query: str, **params: Any) -> SearchResults:
177         """ Find a place by free-text search. Also known as forward geocoding.
178         """
179         query = query.strip()
180         if not query:
181             raise UsageError('Nothing to search for.')
182
183         async with self.begin() as conn:
184             geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
185             phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
186             return await geocoder.lookup(phrases)
187
188
189     # pylint: disable=too-many-arguments,too-many-branches
190     async def search_address(self, amenity: Optional[str] = None,
191                              street: Optional[str] = None,
192                              city: Optional[str] = None,
193                              county: Optional[str] = None,
194                              state: Optional[str] = None,
195                              country: Optional[str] = None,
196                              postalcode: Optional[str] = None,
197                              **params: Any) -> SearchResults:
198         """ Find an address using structured search.
199         """
200         async with self.begin() as conn:
201             details = ntyp.SearchDetails.from_kwargs(params)
202
203             phrases: List[Phrase] = []
204
205             if amenity:
206                 phrases.append(Phrase(PhraseType.AMENITY, amenity))
207             if street:
208                 phrases.append(Phrase(PhraseType.STREET, street))
209             if city:
210                 phrases.append(Phrase(PhraseType.CITY, city))
211             if county:
212                 phrases.append(Phrase(PhraseType.COUNTY, county))
213             if state:
214                 phrases.append(Phrase(PhraseType.STATE, state))
215             if postalcode:
216                 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
217             if country:
218                 phrases.append(Phrase(PhraseType.COUNTRY, country))
219
220             if not phrases:
221                 raise UsageError('Nothing to search for.')
222
223             if amenity or street:
224                 details.restrict_min_max_rank(26, 30)
225             elif city:
226                 details.restrict_min_max_rank(13, 25)
227             elif county:
228                 details.restrict_min_max_rank(10, 12)
229             elif state:
230                 details.restrict_min_max_rank(5, 9)
231             elif postalcode:
232                 details.restrict_min_max_rank(5, 11)
233             else:
234                 details.restrict_min_max_rank(4, 4)
235
236             if 'layers' not in params:
237                 details.layers = ntyp.DataLayer.ADDRESS
238                 if amenity:
239                     details.layers |= ntyp.DataLayer.POI
240
241             geocoder = ForwardGeocoder(conn, details)
242             return await geocoder.lookup(phrases)
243
244
245     async def search_category(self, categories: List[Tuple[str, str]],
246                               near_query: Optional[str] = None,
247                               **params: Any) -> SearchResults:
248         """ Find an object of a certain category near another place.
249             The near place may either be given as an unstructured search
250             query in itself or as coordinates.
251         """
252         if not categories:
253             return SearchResults()
254
255         details = ntyp.SearchDetails.from_kwargs(params)
256         async with self.begin() as conn:
257             if near_query:
258                 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
259             else:
260                 phrases = []
261                 if details.keywords:
262                     await make_query_analyzer(conn)
263
264             geocoder = ForwardGeocoder(conn, details)
265             return await geocoder.lookup_pois(categories, phrases)
266
267
268
269 class NominatimAPI:
270     """ API loader, synchronous version.
271     """
272
273     def __init__(self, project_dir: Path,
274                  environ: Optional[Mapping[str, str]] = None) -> None:
275         self._loop = asyncio.new_event_loop()
276         self._async_api = NominatimAPIAsync(project_dir, environ)
277
278
279     def close(self) -> None:
280         """ Close all active connections to the database. The NominatimAPIAsync
281             object remains usable after closing. If a new API functions is
282             called, new connections are created.
283         """
284         self._loop.run_until_complete(self._async_api.close())
285         self._loop.close()
286
287
288     @property
289     def config(self) -> Configuration:
290         """ Return the configuration used by the API.
291         """
292         return self._async_api.config
293
294     def status(self) -> StatusResult:
295         """ Return the status of the database.
296         """
297         return self._loop.run_until_complete(self._async_api.status())
298
299
300     def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
301         """ Get detailed information about a place in the database.
302         """
303         return self._loop.run_until_complete(self._async_api.details(place, **params))
304
305
306     def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
307         """ Get simple information about a list of places.
308
309             Returns a list of place information for all IDs that were found.
310         """
311         return self._loop.run_until_complete(self._async_api.lookup(places, **params))
312
313
314     def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
315         """ Find a place by its coordinates. Also known as reverse geocoding.
316
317             Returns the closest result that can be found or None if
318             no place matches the given criteria.
319         """
320         return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
321
322
323     def search(self, query: str, **params: Any) -> SearchResults:
324         """ Find a place by free-text search. Also known as forward geocoding.
325         """
326         return self._loop.run_until_complete(
327                    self._async_api.search(query, **params))
328
329
330     # pylint: disable=too-many-arguments
331     def search_address(self, amenity: Optional[str] = None,
332                        street: Optional[str] = None,
333                        city: Optional[str] = None,
334                        county: Optional[str] = None,
335                        state: Optional[str] = None,
336                        country: Optional[str] = None,
337                        postalcode: Optional[str] = None,
338                        **params: Any) -> SearchResults:
339         """ Find an address using structured search.
340         """
341         return self._loop.run_until_complete(
342                    self._async_api.search_address(amenity, street, city, county,
343                                                   state, country, postalcode, **params))
344
345
346     def search_category(self, categories: List[Tuple[str, str]],
347                         near_query: Optional[str] = None,
348                         **params: Any) -> SearchResults:
349         """ Find an object of a certain category near another place.
350             The near place may either be given as an unstructured search
351             query in itself or as a geographic area through the
352             viewbox or near parameters.
353         """
354         return self._loop.run_until_complete(
355                    self._async_api.search_category(categories, near_query, **params))