1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
13 from pathlib import Path
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
18 from nominatim.errors import UsageError
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
21 from nominatim.config import Configuration
22 from nominatim.api.connection import SearchConnection
23 from nominatim.api.status import get_status, StatusResult
24 from nominatim.api.lookup import get_detailed_place, get_simple_place
25 from nominatim.api.reverse import ReverseGeocoder
26 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
27 import nominatim.api.types as ntyp
28 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
31 class NominatimAPIAsync:
32 """ API loader asynchornous version.
34 def __init__(self, project_dir: Path,
35 environ: Optional[Mapping[str, str]] = None) -> None:
36 self.config = Configuration(project_dir, environ)
37 self.server_version = 0
39 self._engine_lock = asyncio.Lock()
40 self._engine: Optional[sa_asyncio.AsyncEngine] = None
41 self._tables: Optional[SearchTables] = None
42 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
45 async def setup_database(self) -> None:
46 """ Set up the engine and connection parameters.
48 This function will be implicitly called when the database is
49 accessed for the first time. You may also call it explicitly to
50 avoid that the first call is delayed by the setup.
52 async with self._engine_lock:
56 dsn = self.config.get_database_params()
58 query = {k: v for k, v in dsn.items()
59 if k not in ('user', 'password', 'dbname', 'host', 'port')}
60 if PGCORE_LIB == 'asyncpg':
61 query['prepared_statement_cache_size'] = '0'
63 dburl = sa.engine.URL.create(
64 f'postgresql+{PGCORE_LIB}',
65 database=dsn.get('dbname'),
66 username=dsn.get('user'), password=dsn.get('password'),
67 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
69 engine = sa_asyncio.create_async_engine(dburl, future=True,
70 echo=self.config.get_bool('DEBUG_SQL'))
73 async with engine.begin() as conn:
74 result = await conn.scalar(sa.text('SHOW server_version_num'))
75 server_version = int(result)
76 except (PGCORE_ERROR, sa.exc.OperationalError):
79 if server_version >= 110000:
80 @sa.event.listens_for(engine.sync_engine, "connect")
81 def _on_connect(dbapi_con: Any, _: Any) -> None:
82 cursor = dbapi_con.cursor()
83 cursor.execute("SET jit_above_cost TO '-1'")
84 cursor.execute("SET max_parallel_workers_per_gather TO '0'")
85 # Make sure that all connections get the new settings
88 self._property_cache['DB:server_version'] = server_version
90 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
94 async def close(self) -> None:
95 """ Close all active connections to the database. The NominatimAPIAsync
96 object remains usable after closing. If a new API functions is
97 called, new connections are created.
99 if self._engine is not None:
100 await self._engine.dispose()
103 @contextlib.asynccontextmanager
104 async def begin(self) -> AsyncIterator[SearchConnection]:
105 """ Create a new connection with automatic transaction handling.
107 This function may be used to get low-level access to the database.
108 Refer to the documentation of SQLAlchemy for details how to use
109 the connection object.
111 if self._engine is None:
112 await self.setup_database()
114 assert self._engine is not None
115 assert self._tables is not None
117 async with self._engine.begin() as conn:
118 yield SearchConnection(conn, self._tables, self._property_cache)
121 async def status(self) -> StatusResult:
122 """ Return the status of the database.
125 async with self.begin() as conn:
126 status = await get_status(conn)
127 except (PGCORE_ERROR, sa.exc.OperationalError):
128 return StatusResult(700, 'Database connection failed')
133 async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
134 """ Get detailed information about a place in the database.
136 Returns None if there is no entry under the given ID.
138 details = ntyp.LookupDetails.from_kwargs(params)
139 async with self.begin() as conn:
141 await make_query_analyzer(conn)
142 return await get_detailed_place(conn, place, details)
145 async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
146 """ Get simple information about a list of places.
148 Returns a list of place information for all IDs that were found.
150 details = ntyp.LookupDetails.from_kwargs(params)
151 async with self.begin() as conn:
153 await make_query_analyzer(conn)
154 return SearchResults(filter(None,
155 [await get_simple_place(conn, p, details) for p in places]))
158 async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
159 """ Find a place by its coordinates. Also known as reverse geocoding.
161 Returns the closest result that can be found or None if
162 no place matches the given criteria.
164 # The following negation handles NaN correctly. Don't change.
165 if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
166 # There are no results to be expected outside valid coordinates.
169 details = ntyp.ReverseDetails.from_kwargs(params)
170 async with self.begin() as conn:
172 await make_query_analyzer(conn)
173 geocoder = ReverseGeocoder(conn, details)
174 return await geocoder.lookup(coord)
177 async def search(self, query: str, **params: Any) -> SearchResults:
178 """ Find a place by free-text search. Also known as forward geocoding.
180 query = query.strip()
182 raise UsageError('Nothing to search for.')
184 async with self.begin() as conn:
185 geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
186 phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
187 return await geocoder.lookup(phrases)
190 # pylint: disable=too-many-arguments,too-many-branches
191 async def search_address(self, amenity: Optional[str] = None,
192 street: Optional[str] = None,
193 city: Optional[str] = None,
194 county: Optional[str] = None,
195 state: Optional[str] = None,
196 country: Optional[str] = None,
197 postalcode: Optional[str] = None,
198 **params: Any) -> SearchResults:
199 """ Find an address using structured search.
201 async with self.begin() as conn:
202 details = ntyp.SearchDetails.from_kwargs(params)
204 phrases: List[Phrase] = []
207 phrases.append(Phrase(PhraseType.AMENITY, amenity))
209 phrases.append(Phrase(PhraseType.STREET, street))
211 phrases.append(Phrase(PhraseType.CITY, city))
213 phrases.append(Phrase(PhraseType.COUNTY, county))
215 phrases.append(Phrase(PhraseType.STATE, state))
217 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
219 phrases.append(Phrase(PhraseType.COUNTRY, country))
222 raise UsageError('Nothing to search for.')
224 if amenity or street:
225 details.restrict_min_max_rank(26, 30)
227 details.restrict_min_max_rank(13, 25)
229 details.restrict_min_max_rank(10, 12)
231 details.restrict_min_max_rank(5, 9)
233 details.restrict_min_max_rank(5, 11)
235 details.restrict_min_max_rank(4, 4)
237 if 'layers' not in params:
238 details.layers = ntyp.DataLayer.ADDRESS
240 details.layers |= ntyp.DataLayer.POI
242 geocoder = ForwardGeocoder(conn, details)
243 return await geocoder.lookup(phrases)
246 async def search_category(self, categories: List[Tuple[str, str]],
247 near_query: Optional[str] = None,
248 **params: Any) -> SearchResults:
249 """ Find an object of a certain category near another place.
250 The near place may either be given as an unstructured search
251 query in itself or as coordinates.
254 return SearchResults()
256 details = ntyp.SearchDetails.from_kwargs(params)
257 async with self.begin() as conn:
259 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
263 await make_query_analyzer(conn)
265 geocoder = ForwardGeocoder(conn, details)
266 return await geocoder.lookup_pois(categories, phrases)
271 """ API loader, synchronous version.
274 def __init__(self, project_dir: Path,
275 environ: Optional[Mapping[str, str]] = None) -> None:
276 self._loop = asyncio.new_event_loop()
277 self._async_api = NominatimAPIAsync(project_dir, environ)
280 def close(self) -> None:
281 """ Close all active connections to the database. The NominatimAPIAsync
282 object remains usable after closing. If a new API functions is
283 called, new connections are created.
285 self._loop.run_until_complete(self._async_api.close())
290 def config(self) -> Configuration:
291 """ Return the configuration used by the API.
293 return self._async_api.config
295 def status(self) -> StatusResult:
296 """ Return the status of the database.
298 return self._loop.run_until_complete(self._async_api.status())
301 def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
302 """ Get detailed information about a place in the database.
304 return self._loop.run_until_complete(self._async_api.details(place, **params))
307 def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
308 """ Get simple information about a list of places.
310 Returns a list of place information for all IDs that were found.
312 return self._loop.run_until_complete(self._async_api.lookup(places, **params))
315 def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
316 """ Find a place by its coordinates. Also known as reverse geocoding.
318 Returns the closest result that can be found or None if
319 no place matches the given criteria.
321 return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
324 def search(self, query: str, **params: Any) -> SearchResults:
325 """ Find a place by free-text search. Also known as forward geocoding.
327 return self._loop.run_until_complete(
328 self._async_api.search(query, **params))
331 # pylint: disable=too-many-arguments
332 def search_address(self, amenity: Optional[str] = None,
333 street: Optional[str] = None,
334 city: Optional[str] = None,
335 county: Optional[str] = None,
336 state: Optional[str] = None,
337 country: Optional[str] = None,
338 postalcode: Optional[str] = None,
339 **params: Any) -> SearchResults:
340 """ Find an address using structured search.
342 return self._loop.run_until_complete(
343 self._async_api.search_address(amenity, street, city, county,
344 state, country, postalcode, **params))
347 def search_category(self, categories: List[Tuple[str, str]],
348 near_query: Optional[str] = None,
349 **params: Any) -> SearchResults:
350 """ Find an object of a certain category near another place.
351 The near place may either be given as an unstructured search
352 query in itself or as a geographic area through the
353 viewbox or near parameters.
355 return self._loop.run_until_complete(
356 self._async_api.search_category(categories, near_query, **params))