]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/core.py
2a19d97442cc46cf6870b213117cecc7ff65988d
[nominatim.git] / nominatim / api / core.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of classes for API access via libraries.
9 """
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
11 import asyncio
12 import contextlib
13 from pathlib import Path
14
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
17
18 from nominatim.errors import UsageError
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
21 from nominatim.config import Configuration
22 from nominatim.api.connection import SearchConnection
23 from nominatim.api.status import get_status, StatusResult
24 from nominatim.api.lookup import get_detailed_place, get_simple_place
25 from nominatim.api.reverse import ReverseGeocoder
26 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
27 import nominatim.api.types as ntyp
28 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
29
30
31 class NominatimAPIAsync:
32     """ API loader asynchornous version.
33     """
34     def __init__(self, project_dir: Path,
35                  environ: Optional[Mapping[str, str]] = None) -> None:
36         self.config = Configuration(project_dir, environ)
37         self.server_version = 0
38
39         self._engine_lock = asyncio.Lock()
40         self._engine: Optional[sa_asyncio.AsyncEngine] = None
41         self._tables: Optional[SearchTables] = None
42         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
43
44
45     async def setup_database(self) -> None:
46         """ Set up the engine and connection parameters.
47
48             This function will be implicitly called when the database is
49             accessed for the first time. You may also call it explicitly to
50             avoid that the first call is delayed by the setup.
51         """
52         async with self._engine_lock:
53             if self._engine:
54                 return
55
56             dsn = self.config.get_database_params()
57
58             query = {k: v for k, v in dsn.items()
59                       if k not in ('user', 'password', 'dbname', 'host', 'port')}
60
61             dburl = sa.engine.URL.create(
62                        f'postgresql+{PGCORE_LIB}',
63                        database=dsn.get('dbname'),
64                        username=dsn.get('user'), password=dsn.get('password'),
65                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
66                        query=query)
67             engine = sa_asyncio.create_async_engine(dburl, future=True,
68                                                     echo=self.config.get_bool('DEBUG_SQL'))
69
70             try:
71                 async with engine.begin() as conn:
72                     result = await conn.scalar(sa.text('SHOW server_version_num'))
73                     server_version = int(result)
74             except (PGCORE_ERROR, sa.exc.OperationalError):
75                 server_version = 0
76
77             if server_version >= 110000:
78                 @sa.event.listens_for(engine.sync_engine, "connect")
79                 def _on_connect(dbapi_con: Any, _: Any) -> None:
80                     cursor = dbapi_con.cursor()
81                     cursor.execute("SET jit_above_cost TO '-1'")
82                     cursor.execute("SET max_parallel_workers_per_gather TO '0'")
83                 # Make sure that all connections get the new settings
84                 await self.close()
85
86             self._property_cache['DB:server_version'] = server_version
87
88             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
89             self._engine = engine
90
91
92     async def close(self) -> None:
93         """ Close all active connections to the database. The NominatimAPIAsync
94             object remains usable after closing. If a new API functions is
95             called, new connections are created.
96         """
97         if self._engine is not None:
98             await self._engine.dispose()
99
100
101     @contextlib.asynccontextmanager
102     async def begin(self) -> AsyncIterator[SearchConnection]:
103         """ Create a new connection with automatic transaction handling.
104
105             This function may be used to get low-level access to the database.
106             Refer to the documentation of SQLAlchemy for details how to use
107             the connection object.
108         """
109         if self._engine is None:
110             await self.setup_database()
111
112         assert self._engine is not None
113         assert self._tables is not None
114
115         async with self._engine.begin() as conn:
116             yield SearchConnection(conn, self._tables, self._property_cache)
117
118
119     async def status(self) -> StatusResult:
120         """ Return the status of the database.
121         """
122         try:
123             async with self.begin() as conn:
124                 status = await get_status(conn)
125         except (PGCORE_ERROR, sa.exc.OperationalError):
126             return StatusResult(700, 'Database connection failed')
127
128         return status
129
130
131     async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
132         """ Get detailed information about a place in the database.
133
134             Returns None if there is no entry under the given ID.
135         """
136         details = ntyp.LookupDetails.from_kwargs(params)
137         async with self.begin() as conn:
138             if details.keywords:
139                 await make_query_analyzer(conn)
140             return await get_detailed_place(conn, place, details)
141
142
143     async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
144         """ Get simple information about a list of places.
145
146             Returns a list of place information for all IDs that were found.
147         """
148         details = ntyp.LookupDetails.from_kwargs(params)
149         async with self.begin() as conn:
150             if details.keywords:
151                 await make_query_analyzer(conn)
152             return SearchResults(filter(None,
153                                         [await get_simple_place(conn, p, details) for p in places]))
154
155
156     async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
157         """ Find a place by its coordinates. Also known as reverse geocoding.
158
159             Returns the closest result that can be found or None if
160             no place matches the given criteria.
161         """
162         # The following negation handles NaN correctly. Don't change.
163         if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
164             # There are no results to be expected outside valid coordinates.
165             return None
166
167         details = ntyp.ReverseDetails.from_kwargs(params)
168         async with self.begin() as conn:
169             if details.keywords:
170                 await make_query_analyzer(conn)
171             geocoder = ReverseGeocoder(conn, details)
172             return await geocoder.lookup(coord)
173
174
175     async def search(self, query: str, **params: Any) -> SearchResults:
176         """ Find a place by free-text search. Also known as forward geocoding.
177         """
178         query = query.strip()
179         if not query:
180             raise UsageError('Nothing to search for.')
181
182         async with self.begin() as conn:
183             geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
184             phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
185             return await geocoder.lookup(phrases)
186
187
188     # pylint: disable=too-many-arguments,too-many-branches
189     async def search_address(self, amenity: Optional[str] = None,
190                              street: Optional[str] = None,
191                              city: Optional[str] = None,
192                              county: Optional[str] = None,
193                              state: Optional[str] = None,
194                              country: Optional[str] = None,
195                              postalcode: Optional[str] = None,
196                              **params: Any) -> SearchResults:
197         """ Find an address using structured search.
198         """
199         async with self.begin() as conn:
200             details = ntyp.SearchDetails.from_kwargs(params)
201
202             phrases: List[Phrase] = []
203
204             if amenity:
205                 phrases.append(Phrase(PhraseType.AMENITY, amenity))
206             if street:
207                 phrases.append(Phrase(PhraseType.STREET, street))
208             if city:
209                 phrases.append(Phrase(PhraseType.CITY, city))
210             if county:
211                 phrases.append(Phrase(PhraseType.COUNTY, county))
212             if state:
213                 phrases.append(Phrase(PhraseType.STATE, state))
214             if postalcode:
215                 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
216             if country:
217                 phrases.append(Phrase(PhraseType.COUNTRY, country))
218
219             if not phrases:
220                 raise UsageError('Nothing to search for.')
221
222             if amenity or street:
223                 details.restrict_min_max_rank(26, 30)
224             elif city:
225                 details.restrict_min_max_rank(13, 25)
226             elif county:
227                 details.restrict_min_max_rank(10, 12)
228             elif state:
229                 details.restrict_min_max_rank(5, 9)
230             elif postalcode:
231                 details.restrict_min_max_rank(5, 11)
232             else:
233                 details.restrict_min_max_rank(4, 4)
234
235             if 'layers' not in params:
236                 details.layers = ntyp.DataLayer.ADDRESS
237                 if amenity:
238                     details.layers |= ntyp.DataLayer.POI
239
240             geocoder = ForwardGeocoder(conn, details)
241             return await geocoder.lookup(phrases)
242
243
244     async def search_category(self, categories: List[Tuple[str, str]],
245                               near_query: Optional[str] = None,
246                               **params: Any) -> SearchResults:
247         """ Find an object of a certain category near another place.
248             The near place may either be given as an unstructured search
249             query in itself or as coordinates.
250         """
251         if not categories:
252             return SearchResults()
253
254         details = ntyp.SearchDetails.from_kwargs(params)
255         async with self.begin() as conn:
256             if near_query:
257                 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
258             else:
259                 phrases = []
260                 if details.keywords:
261                     await make_query_analyzer(conn)
262
263             geocoder = ForwardGeocoder(conn, details)
264             return await geocoder.lookup_pois(categories, phrases)
265
266
267
268 class NominatimAPI:
269     """ API loader, synchronous version.
270     """
271
272     def __init__(self, project_dir: Path,
273                  environ: Optional[Mapping[str, str]] = None) -> None:
274         self._loop = asyncio.new_event_loop()
275         self._async_api = NominatimAPIAsync(project_dir, environ)
276
277
278     def close(self) -> None:
279         """ Close all active connections to the database. The NominatimAPIAsync
280             object remains usable after closing. If a new API functions is
281             called, new connections are created.
282         """
283         self._loop.run_until_complete(self._async_api.close())
284         self._loop.close()
285
286
287     @property
288     def config(self) -> Configuration:
289         """ Return the configuration used by the API.
290         """
291         return self._async_api.config
292
293     def status(self) -> StatusResult:
294         """ Return the status of the database.
295         """
296         return self._loop.run_until_complete(self._async_api.status())
297
298
299     def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
300         """ Get detailed information about a place in the database.
301         """
302         return self._loop.run_until_complete(self._async_api.details(place, **params))
303
304
305     def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
306         """ Get simple information about a list of places.
307
308             Returns a list of place information for all IDs that were found.
309         """
310         return self._loop.run_until_complete(self._async_api.lookup(places, **params))
311
312
313     def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
314         """ Find a place by its coordinates. Also known as reverse geocoding.
315
316             Returns the closest result that can be found or None if
317             no place matches the given criteria.
318         """
319         return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
320
321
322     def search(self, query: str, **params: Any) -> SearchResults:
323         """ Find a place by free-text search. Also known as forward geocoding.
324         """
325         return self._loop.run_until_complete(
326                    self._async_api.search(query, **params))
327
328
329     # pylint: disable=too-many-arguments
330     def search_address(self, amenity: Optional[str] = None,
331                        street: Optional[str] = None,
332                        city: Optional[str] = None,
333                        county: Optional[str] = None,
334                        state: Optional[str] = None,
335                        country: Optional[str] = None,
336                        postalcode: Optional[str] = None,
337                        **params: Any) -> SearchResults:
338         """ Find an address using structured search.
339         """
340         return self._loop.run_until_complete(
341                    self._async_api.search_address(amenity, street, city, county,
342                                                   state, country, postalcode, **params))
343
344
345     def search_category(self, categories: List[Tuple[str, str]],
346                         near_query: Optional[str] = None,
347                         **params: Any) -> SearchResults:
348         """ Find an object of a certain category near another place.
349             The near place may either be given as an unstructured search
350             query in itself or as a geographic area through the
351             viewbox or near parameters.
352         """
353         return self._loop.run_until_complete(
354                    self._async_api.search_category(categories, near_query, **params))