]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/core.py
older Python versions need a reference to the loop for a lock
[nominatim.git] / nominatim / api / core.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of classes for API access via libraries.
9 """
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
11 import asyncio
12 import sys
13 import contextlib
14 from pathlib import Path
15
16 import sqlalchemy as sa
17 import sqlalchemy.ext.asyncio as sa_asyncio
18
19 from nominatim.errors import UsageError
20 from nominatim.db.sqlalchemy_schema import SearchTables
21 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
22 from nominatim.config import Configuration
23 from nominatim.api.connection import SearchConnection
24 from nominatim.api.status import get_status, StatusResult
25 from nominatim.api.lookup import get_detailed_place, get_simple_place
26 from nominatim.api.reverse import ReverseGeocoder
27 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
28 import nominatim.api.types as ntyp
29 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
30
31
32 class NominatimAPIAsync:
33     """ API loader asynchornous version.
34     """
35     def __init__(self, project_dir: Path,
36                  environ: Optional[Mapping[str, str]] = None,
37                  loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
38         self.config = Configuration(project_dir, environ)
39         self.server_version = 0
40
41         if sys.version_info >= (3, 10):
42             self._engine_lock = asyncio.Lock()
43         else:
44             self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
45         self._engine: Optional[sa_asyncio.AsyncEngine] = None
46         self._tables: Optional[SearchTables] = None
47         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
48
49
50     async def setup_database(self) -> None:
51         """ Set up the engine and connection parameters.
52
53             This function will be implicitly called when the database is
54             accessed for the first time. You may also call it explicitly to
55             avoid that the first call is delayed by the setup.
56         """
57         async with self._engine_lock:
58             if self._engine:
59                 return
60
61             dsn = self.config.get_database_params()
62             pool_size = self.config.get_int('API_POOL_SIZE')
63
64             query = {k: v for k, v in dsn.items()
65                       if k not in ('user', 'password', 'dbname', 'host', 'port')}
66
67             dburl = sa.engine.URL.create(
68                        f'postgresql+{PGCORE_LIB}',
69                        database=dsn.get('dbname'),
70                        username=dsn.get('user'), password=dsn.get('password'),
71                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
72                        query=query)
73             engine = sa_asyncio.create_async_engine(dburl, future=True,
74                                                     max_overflow=0, pool_size=pool_size,
75                                                     echo=self.config.get_bool('DEBUG_SQL'))
76
77             try:
78                 async with engine.begin() as conn:
79                     result = await conn.scalar(sa.text('SHOW server_version_num'))
80                     server_version = int(result)
81             except (PGCORE_ERROR, sa.exc.OperationalError):
82                 server_version = 0
83
84             if server_version >= 110000:
85                 @sa.event.listens_for(engine.sync_engine, "connect")
86                 def _on_connect(dbapi_con: Any, _: Any) -> None:
87                     cursor = dbapi_con.cursor()
88                     cursor.execute("SET jit_above_cost TO '-1'")
89                     cursor.execute("SET max_parallel_workers_per_gather TO '0'")
90                 # Make sure that all connections get the new settings
91                 await self.close()
92
93             self._property_cache['DB:server_version'] = server_version
94
95             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
96             self._engine = engine
97
98
99     async def close(self) -> None:
100         """ Close all active connections to the database. The NominatimAPIAsync
101             object remains usable after closing. If a new API functions is
102             called, new connections are created.
103         """
104         if self._engine is not None:
105             await self._engine.dispose()
106
107
108     @contextlib.asynccontextmanager
109     async def begin(self) -> AsyncIterator[SearchConnection]:
110         """ Create a new connection with automatic transaction handling.
111
112             This function may be used to get low-level access to the database.
113             Refer to the documentation of SQLAlchemy for details how to use
114             the connection object.
115         """
116         if self._engine is None:
117             await self.setup_database()
118
119         assert self._engine is not None
120         assert self._tables is not None
121
122         async with self._engine.begin() as conn:
123             yield SearchConnection(conn, self._tables, self._property_cache)
124
125
126     async def status(self) -> StatusResult:
127         """ Return the status of the database.
128         """
129         try:
130             async with self.begin() as conn:
131                 status = await get_status(conn)
132         except (PGCORE_ERROR, sa.exc.OperationalError):
133             return StatusResult(700, 'Database connection failed')
134
135         return status
136
137
138     async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
139         """ Get detailed information about a place in the database.
140
141             Returns None if there is no entry under the given ID.
142         """
143         details = ntyp.LookupDetails.from_kwargs(params)
144         async with self.begin() as conn:
145             if details.keywords:
146                 await make_query_analyzer(conn)
147             return await get_detailed_place(conn, place, details)
148
149
150     async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
151         """ Get simple information about a list of places.
152
153             Returns a list of place information for all IDs that were found.
154         """
155         details = ntyp.LookupDetails.from_kwargs(params)
156         async with self.begin() as conn:
157             if details.keywords:
158                 await make_query_analyzer(conn)
159             return SearchResults(filter(None,
160                                         [await get_simple_place(conn, p, details) for p in places]))
161
162
163     async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
164         """ Find a place by its coordinates. Also known as reverse geocoding.
165
166             Returns the closest result that can be found or None if
167             no place matches the given criteria.
168         """
169         # The following negation handles NaN correctly. Don't change.
170         if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
171             # There are no results to be expected outside valid coordinates.
172             return None
173
174         details = ntyp.ReverseDetails.from_kwargs(params)
175         async with self.begin() as conn:
176             if details.keywords:
177                 await make_query_analyzer(conn)
178             geocoder = ReverseGeocoder(conn, details)
179             return await geocoder.lookup(coord)
180
181
182     async def search(self, query: str, **params: Any) -> SearchResults:
183         """ Find a place by free-text search. Also known as forward geocoding.
184         """
185         query = query.strip()
186         if not query:
187             raise UsageError('Nothing to search for.')
188
189         async with self.begin() as conn:
190             geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
191             phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
192             return await geocoder.lookup(phrases)
193
194
195     # pylint: disable=too-many-arguments,too-many-branches
196     async def search_address(self, amenity: Optional[str] = None,
197                              street: Optional[str] = None,
198                              city: Optional[str] = None,
199                              county: Optional[str] = None,
200                              state: Optional[str] = None,
201                              country: Optional[str] = None,
202                              postalcode: Optional[str] = None,
203                              **params: Any) -> SearchResults:
204         """ Find an address using structured search.
205         """
206         async with self.begin() as conn:
207             details = ntyp.SearchDetails.from_kwargs(params)
208
209             phrases: List[Phrase] = []
210
211             if amenity:
212                 phrases.append(Phrase(PhraseType.AMENITY, amenity))
213             if street:
214                 phrases.append(Phrase(PhraseType.STREET, street))
215             if city:
216                 phrases.append(Phrase(PhraseType.CITY, city))
217             if county:
218                 phrases.append(Phrase(PhraseType.COUNTY, county))
219             if state:
220                 phrases.append(Phrase(PhraseType.STATE, state))
221             if postalcode:
222                 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
223             if country:
224                 phrases.append(Phrase(PhraseType.COUNTRY, country))
225
226             if not phrases:
227                 raise UsageError('Nothing to search for.')
228
229             if amenity or street:
230                 details.restrict_min_max_rank(26, 30)
231             elif city:
232                 details.restrict_min_max_rank(13, 25)
233             elif county:
234                 details.restrict_min_max_rank(10, 12)
235             elif state:
236                 details.restrict_min_max_rank(5, 9)
237             elif postalcode:
238                 details.restrict_min_max_rank(5, 11)
239             else:
240                 details.restrict_min_max_rank(4, 4)
241
242             if 'layers' not in params:
243                 details.layers = ntyp.DataLayer.ADDRESS
244                 if amenity:
245                     details.layers |= ntyp.DataLayer.POI
246
247             geocoder = ForwardGeocoder(conn, details)
248             return await geocoder.lookup(phrases)
249
250
251     async def search_category(self, categories: List[Tuple[str, str]],
252                               near_query: Optional[str] = None,
253                               **params: Any) -> SearchResults:
254         """ Find an object of a certain category near another place.
255             The near place may either be given as an unstructured search
256             query in itself or as coordinates.
257         """
258         if not categories:
259             return SearchResults()
260
261         details = ntyp.SearchDetails.from_kwargs(params)
262         async with self.begin() as conn:
263             if near_query:
264                 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
265             else:
266                 phrases = []
267                 if details.keywords:
268                     await make_query_analyzer(conn)
269
270             geocoder = ForwardGeocoder(conn, details)
271             return await geocoder.lookup_pois(categories, phrases)
272
273
274
275 class NominatimAPI:
276     """ API loader, synchronous version.
277     """
278
279     def __init__(self, project_dir: Path,
280                  environ: Optional[Mapping[str, str]] = None) -> None:
281         self._loop = asyncio.new_event_loop()
282         self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
283
284
285     def close(self) -> None:
286         """ Close all active connections to the database. The NominatimAPIAsync
287             object remains usable after closing. If a new API functions is
288             called, new connections are created.
289         """
290         self._loop.run_until_complete(self._async_api.close())
291         self._loop.close()
292
293
294     @property
295     def config(self) -> Configuration:
296         """ Return the configuration used by the API.
297         """
298         return self._async_api.config
299
300     def status(self) -> StatusResult:
301         """ Return the status of the database.
302         """
303         return self._loop.run_until_complete(self._async_api.status())
304
305
306     def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
307         """ Get detailed information about a place in the database.
308         """
309         return self._loop.run_until_complete(self._async_api.details(place, **params))
310
311
312     def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
313         """ Get simple information about a list of places.
314
315             Returns a list of place information for all IDs that were found.
316         """
317         return self._loop.run_until_complete(self._async_api.lookup(places, **params))
318
319
320     def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
321         """ Find a place by its coordinates. Also known as reverse geocoding.
322
323             Returns the closest result that can be found or None if
324             no place matches the given criteria.
325         """
326         return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
327
328
329     def search(self, query: str, **params: Any) -> SearchResults:
330         """ Find a place by free-text search. Also known as forward geocoding.
331         """
332         return self._loop.run_until_complete(
333                    self._async_api.search(query, **params))
334
335
336     # pylint: disable=too-many-arguments
337     def search_address(self, amenity: Optional[str] = None,
338                        street: Optional[str] = None,
339                        city: Optional[str] = None,
340                        county: Optional[str] = None,
341                        state: Optional[str] = None,
342                        country: Optional[str] = None,
343                        postalcode: Optional[str] = None,
344                        **params: Any) -> SearchResults:
345         """ Find an address using structured search.
346         """
347         return self._loop.run_until_complete(
348                    self._async_api.search_address(amenity, street, city, county,
349                                                   state, country, postalcode, **params))
350
351
352     def search_category(self, categories: List[Tuple[str, str]],
353                         near_query: Optional[str] = None,
354                         **params: Any) -> SearchResults:
355         """ Find an object of a certain category near another place.
356             The near place may either be given as an unstructured search
357             query in itself or as a geographic area through the
358             viewbox or near parameters.
359         """
360         return self._loop.run_until_complete(
361                    self._async_api.search_category(categories, near_query, **params))