1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
14 from pathlib import Path
16 import sqlalchemy as sa
17 import sqlalchemy.ext.asyncio as sa_asyncio
19 from nominatim.errors import UsageError
20 from nominatim.db.sqlalchemy_schema import SearchTables
21 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
22 from nominatim.config import Configuration
23 from nominatim.api.connection import SearchConnection
24 from nominatim.api.status import get_status, StatusResult
25 from nominatim.api.lookup import get_detailed_place, get_simple_place
26 from nominatim.api.reverse import ReverseGeocoder
27 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
28 import nominatim.api.types as ntyp
29 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
32 class NominatimAPIAsync:
33 """ API loader asynchornous version.
35 def __init__(self, project_dir: Path,
36 environ: Optional[Mapping[str, str]] = None,
37 loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
38 self.config = Configuration(project_dir, environ)
39 self.query_timeout = self.config.get_int('QUERY_TIMEOUT') \
40 if self.config.QUERY_TIMEOUT else None
41 self.server_version = 0
43 if sys.version_info >= (3, 10):
44 self._engine_lock = asyncio.Lock()
46 self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
47 self._engine: Optional[sa_asyncio.AsyncEngine] = None
48 self._tables: Optional[SearchTables] = None
49 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
52 async def setup_database(self) -> None:
53 """ Set up the engine and connection parameters.
55 This function will be implicitly called when the database is
56 accessed for the first time. You may also call it explicitly to
57 avoid that the first call is delayed by the setup.
59 async with self._engine_lock:
63 dsn = self.config.get_database_params()
64 pool_size = self.config.get_int('API_POOL_SIZE')
66 query = {k: v for k, v in dsn.items()
67 if k not in ('user', 'password', 'dbname', 'host', 'port')}
69 dburl = sa.engine.URL.create(
70 f'postgresql+{PGCORE_LIB}',
71 database=dsn.get('dbname'),
72 username=dsn.get('user'), password=dsn.get('password'),
73 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
75 engine = sa_asyncio.create_async_engine(dburl, future=True,
76 max_overflow=0, pool_size=pool_size,
77 echo=self.config.get_bool('DEBUG_SQL'))
80 async with engine.begin() as conn:
81 result = await conn.scalar(sa.text('SHOW server_version_num'))
82 server_version = int(result)
83 except (PGCORE_ERROR, sa.exc.OperationalError):
86 if server_version >= 110000:
87 @sa.event.listens_for(engine.sync_engine, "connect")
88 def _on_connect(dbapi_con: Any, _: Any) -> None:
89 cursor = dbapi_con.cursor()
90 cursor.execute("SET jit_above_cost TO '-1'")
91 cursor.execute("SET max_parallel_workers_per_gather TO '0'")
92 # Make sure that all connections get the new settings
95 self._property_cache['DB:server_version'] = server_version
97 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
101 async def close(self) -> None:
102 """ Close all active connections to the database. The NominatimAPIAsync
103 object remains usable after closing. If a new API functions is
104 called, new connections are created.
106 if self._engine is not None:
107 await self._engine.dispose()
110 @contextlib.asynccontextmanager
111 async def begin(self) -> AsyncIterator[SearchConnection]:
112 """ Create a new connection with automatic transaction handling.
114 This function may be used to get low-level access to the database.
115 Refer to the documentation of SQLAlchemy for details how to use
116 the connection object.
118 if self._engine is None:
119 await self.setup_database()
121 assert self._engine is not None
122 assert self._tables is not None
124 async with self._engine.begin() as conn:
125 yield SearchConnection(conn, self._tables, self._property_cache)
128 async def status(self) -> StatusResult:
129 """ Return the status of the database.
132 async with self.begin() as conn:
133 conn.set_query_timeout(self.query_timeout)
134 status = await get_status(conn)
135 except (PGCORE_ERROR, sa.exc.OperationalError):
136 return StatusResult(700, 'Database connection failed')
141 async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
142 """ Get detailed information about a place in the database.
144 Returns None if there is no entry under the given ID.
146 details = ntyp.LookupDetails.from_kwargs(params)
147 async with self.begin() as conn:
148 conn.set_query_timeout(self.query_timeout)
150 await make_query_analyzer(conn)
151 return await get_detailed_place(conn, place, details)
154 async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
155 """ Get simple information about a list of places.
157 Returns a list of place information for all IDs that were found.
159 details = ntyp.LookupDetails.from_kwargs(params)
160 async with self.begin() as conn:
161 conn.set_query_timeout(self.query_timeout)
163 await make_query_analyzer(conn)
164 return SearchResults(filter(None,
165 [await get_simple_place(conn, p, details) for p in places]))
168 async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
169 """ Find a place by its coordinates. Also known as reverse geocoding.
171 Returns the closest result that can be found or None if
172 no place matches the given criteria.
174 # The following negation handles NaN correctly. Don't change.
175 if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
176 # There are no results to be expected outside valid coordinates.
179 details = ntyp.ReverseDetails.from_kwargs(params)
180 async with self.begin() as conn:
181 conn.set_query_timeout(self.query_timeout)
183 await make_query_analyzer(conn)
184 geocoder = ReverseGeocoder(conn, details)
185 return await geocoder.lookup(coord)
188 async def search(self, query: str, **params: Any) -> SearchResults:
189 """ Find a place by free-text search. Also known as forward geocoding.
191 query = query.strip()
193 raise UsageError('Nothing to search for.')
195 async with self.begin() as conn:
196 conn.set_query_timeout(self.query_timeout)
197 geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
198 self.config.get_int('REQUEST_TIMEOUT') \
199 if self.config.REQUEST_TIMEOUT else None)
200 phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
201 return await geocoder.lookup(phrases)
204 # pylint: disable=too-many-arguments,too-many-branches
205 async def search_address(self, amenity: Optional[str] = None,
206 street: Optional[str] = None,
207 city: Optional[str] = None,
208 county: Optional[str] = None,
209 state: Optional[str] = None,
210 country: Optional[str] = None,
211 postalcode: Optional[str] = None,
212 **params: Any) -> SearchResults:
213 """ Find an address using structured search.
215 async with self.begin() as conn:
216 conn.set_query_timeout(self.query_timeout)
217 details = ntyp.SearchDetails.from_kwargs(params)
219 phrases: List[Phrase] = []
222 phrases.append(Phrase(PhraseType.AMENITY, amenity))
224 phrases.append(Phrase(PhraseType.STREET, street))
226 phrases.append(Phrase(PhraseType.CITY, city))
228 phrases.append(Phrase(PhraseType.COUNTY, county))
230 phrases.append(Phrase(PhraseType.STATE, state))
232 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
234 phrases.append(Phrase(PhraseType.COUNTRY, country))
237 raise UsageError('Nothing to search for.')
239 if amenity or street:
240 details.restrict_min_max_rank(26, 30)
242 details.restrict_min_max_rank(13, 25)
244 details.restrict_min_max_rank(10, 12)
246 details.restrict_min_max_rank(5, 9)
248 details.restrict_min_max_rank(5, 11)
250 details.restrict_min_max_rank(4, 4)
252 if 'layers' not in params:
253 details.layers = ntyp.DataLayer.ADDRESS
255 details.layers |= ntyp.DataLayer.POI
257 geocoder = ForwardGeocoder(conn, details,
258 self.config.get_int('REQUEST_TIMEOUT') \
259 if self.config.REQUEST_TIMEOUT else None)
260 return await geocoder.lookup(phrases)
263 async def search_category(self, categories: List[Tuple[str, str]],
264 near_query: Optional[str] = None,
265 **params: Any) -> SearchResults:
266 """ Find an object of a certain category near another place.
267 The near place may either be given as an unstructured search
268 query in itself or as coordinates.
271 return SearchResults()
273 details = ntyp.SearchDetails.from_kwargs(params)
274 async with self.begin() as conn:
275 conn.set_query_timeout(self.query_timeout)
277 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
281 await make_query_analyzer(conn)
283 geocoder = ForwardGeocoder(conn, details,
284 self.config.get_int('REQUEST_TIMEOUT') \
285 if self.config.REQUEST_TIMEOUT else None)
286 return await geocoder.lookup_pois(categories, phrases)
291 """ API loader, synchronous version.
294 def __init__(self, project_dir: Path,
295 environ: Optional[Mapping[str, str]] = None) -> None:
296 self._loop = asyncio.new_event_loop()
297 self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
300 def close(self) -> None:
301 """ Close all active connections to the database. The NominatimAPIAsync
302 object remains usable after closing. If a new API functions is
303 called, new connections are created.
305 self._loop.run_until_complete(self._async_api.close())
310 def config(self) -> Configuration:
311 """ Return the configuration used by the API.
313 return self._async_api.config
315 def status(self) -> StatusResult:
316 """ Return the status of the database.
318 return self._loop.run_until_complete(self._async_api.status())
321 def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
322 """ Get detailed information about a place in the database.
324 return self._loop.run_until_complete(self._async_api.details(place, **params))
327 def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
328 """ Get simple information about a list of places.
330 Returns a list of place information for all IDs that were found.
332 return self._loop.run_until_complete(self._async_api.lookup(places, **params))
335 def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
336 """ Find a place by its coordinates. Also known as reverse geocoding.
338 Returns the closest result that can be found or None if
339 no place matches the given criteria.
341 return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
344 def search(self, query: str, **params: Any) -> SearchResults:
345 """ Find a place by free-text search. Also known as forward geocoding.
347 return self._loop.run_until_complete(
348 self._async_api.search(query, **params))
351 # pylint: disable=too-many-arguments
352 def search_address(self, amenity: Optional[str] = None,
353 street: Optional[str] = None,
354 city: Optional[str] = None,
355 county: Optional[str] = None,
356 state: Optional[str] = None,
357 country: Optional[str] = None,
358 postalcode: Optional[str] = None,
359 **params: Any) -> SearchResults:
360 """ Find an address using structured search.
362 return self._loop.run_until_complete(
363 self._async_api.search_address(amenity, street, city, county,
364 state, country, postalcode, **params))
367 def search_category(self, categories: List[Tuple[str, str]],
368 near_query: Optional[str] = None,
369 **params: Any) -> SearchResults:
370 """ Find an object of a certain category near another place.
371 The near place may either be given as an unstructured search
372 query in itself or as a geographic area through the
373 viewbox or near parameters.
375 return self._loop.run_until_complete(
376 self._async_api.search_category(categories, near_query, **params))