1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
13 from pathlib import Path
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
18 from nominatim.errors import UsageError
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
21 from nominatim.config import Configuration
22 from nominatim.api.connection import SearchConnection
23 from nominatim.api.status import get_status, StatusResult
24 from nominatim.api.lookup import get_detailed_place, get_simple_place
25 from nominatim.api.reverse import ReverseGeocoder
26 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
27 import nominatim.api.types as ntyp
28 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
31 class NominatimAPIAsync:
32 """ API loader asynchornous version.
34 def __init__(self, project_dir: Path,
35 environ: Optional[Mapping[str, str]] = None) -> None:
36 self.config = Configuration(project_dir, environ)
37 self.server_version = 0
39 self._engine_lock = asyncio.Lock()
40 self._engine: Optional[sa_asyncio.AsyncEngine] = None
41 self._tables: Optional[SearchTables] = None
42 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
45 async def setup_database(self) -> None:
46 """ Set up the engine and connection parameters.
48 This function will be implicitly called when the database is
49 accessed for the first time. You may also call it explicitly to
50 avoid that the first call is delayed by the setup.
52 async with self._engine_lock:
56 dsn = self.config.get_database_params()
58 query = {k: v for k, v in dsn.items()
59 if k not in ('user', 'password', 'dbname', 'host', 'port')}
61 dburl = sa.engine.URL.create(
62 f'postgresql+{PGCORE_LIB}',
63 database=dsn.get('dbname'),
64 username=dsn.get('user'), password=dsn.get('password'),
65 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
67 engine = sa_asyncio.create_async_engine(dburl, future=True,
68 echo=self.config.get_bool('DEBUG_SQL'))
71 async with engine.begin() as conn:
72 result = await conn.scalar(sa.text('SHOW server_version_num'))
73 server_version = int(result)
74 except (PGCORE_ERROR, sa.exc.OperationalError):
77 if server_version >= 110000:
78 @sa.event.listens_for(engine.sync_engine, "connect")
79 def _on_connect(dbapi_con: Any, _: Any) -> None:
80 cursor = dbapi_con.cursor()
81 cursor.execute("SET jit_above_cost TO '-1'")
82 cursor.execute("SET max_parallel_workers_per_gather TO '0'")
83 # Make sure that all connections get the new settings
86 self._property_cache['DB:server_version'] = server_version
88 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
92 async def close(self) -> None:
93 """ Close all active connections to the database. The NominatimAPIAsync
94 object remains usable after closing. If a new API functions is
95 called, new connections are created.
97 if self._engine is not None:
98 await self._engine.dispose()
101 @contextlib.asynccontextmanager
102 async def begin(self) -> AsyncIterator[SearchConnection]:
103 """ Create a new connection with automatic transaction handling.
105 This function may be used to get low-level access to the database.
106 Refer to the documentation of SQLAlchemy for details how to use
107 the connection object.
109 if self._engine is None:
110 await self.setup_database()
112 assert self._engine is not None
113 assert self._tables is not None
115 async with self._engine.begin() as conn:
116 yield SearchConnection(conn, self._tables, self._property_cache)
119 async def status(self) -> StatusResult:
120 """ Return the status of the database.
123 async with self.begin() as conn:
124 status = await get_status(conn)
125 except (PGCORE_ERROR, sa.exc.OperationalError):
126 return StatusResult(700, 'Database connection failed')
131 async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
132 """ Get detailed information about a place in the database.
134 Returns None if there is no entry under the given ID.
136 details = ntyp.LookupDetails.from_kwargs(params)
137 async with self.begin() as conn:
139 await make_query_analyzer(conn)
140 return await get_detailed_place(conn, place, details)
143 async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
144 """ Get simple information about a list of places.
146 Returns a list of place information for all IDs that were found.
148 details = ntyp.LookupDetails.from_kwargs(params)
149 async with self.begin() as conn:
151 await make_query_analyzer(conn)
152 return SearchResults(filter(None,
153 [await get_simple_place(conn, p, details) for p in places]))
156 async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
157 """ Find a place by its coordinates. Also known as reverse geocoding.
159 Returns the closest result that can be found or None if
160 no place matches the given criteria.
162 # The following negation handles NaN correctly. Don't change.
163 if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
164 # There are no results to be expected outside valid coordinates.
167 details = ntyp.ReverseDetails.from_kwargs(params)
168 async with self.begin() as conn:
170 await make_query_analyzer(conn)
171 geocoder = ReverseGeocoder(conn, details)
172 return await geocoder.lookup(coord)
175 async def search(self, query: str, **params: Any) -> SearchResults:
176 """ Find a place by free-text search. Also known as forward geocoding.
178 query = query.strip()
180 raise UsageError('Nothing to search for.')
182 async with self.begin() as conn:
183 geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
184 phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
185 return await geocoder.lookup(phrases)
188 # pylint: disable=too-many-arguments,too-many-branches
189 async def search_address(self, amenity: Optional[str] = None,
190 street: Optional[str] = None,
191 city: Optional[str] = None,
192 county: Optional[str] = None,
193 state: Optional[str] = None,
194 country: Optional[str] = None,
195 postalcode: Optional[str] = None,
196 **params: Any) -> SearchResults:
197 """ Find an address using structured search.
199 async with self.begin() as conn:
200 details = ntyp.SearchDetails.from_kwargs(params)
202 phrases: List[Phrase] = []
205 phrases.append(Phrase(PhraseType.AMENITY, amenity))
207 phrases.append(Phrase(PhraseType.STREET, street))
209 phrases.append(Phrase(PhraseType.CITY, city))
211 phrases.append(Phrase(PhraseType.COUNTY, county))
213 phrases.append(Phrase(PhraseType.STATE, state))
215 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
217 phrases.append(Phrase(PhraseType.COUNTRY, country))
220 raise UsageError('Nothing to search for.')
222 if amenity or street:
223 details.restrict_min_max_rank(26, 30)
225 details.restrict_min_max_rank(13, 25)
227 details.restrict_min_max_rank(10, 12)
229 details.restrict_min_max_rank(5, 9)
231 details.restrict_min_max_rank(5, 11)
233 details.restrict_min_max_rank(4, 4)
235 if 'layers' not in params:
236 details.layers = ntyp.DataLayer.ADDRESS
238 details.layers |= ntyp.DataLayer.POI
240 geocoder = ForwardGeocoder(conn, details)
241 return await geocoder.lookup(phrases)
244 async def search_category(self, categories: List[Tuple[str, str]],
245 near_query: Optional[str] = None,
246 **params: Any) -> SearchResults:
247 """ Find an object of a certain category near another place.
248 The near place may either be given as an unstructured search
249 query in itself or as coordinates.
252 return SearchResults()
254 details = ntyp.SearchDetails.from_kwargs(params)
255 async with self.begin() as conn:
257 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
261 await make_query_analyzer(conn)
263 geocoder = ForwardGeocoder(conn, details)
264 return await geocoder.lookup_pois(categories, phrases)
269 """ API loader, synchronous version.
272 def __init__(self, project_dir: Path,
273 environ: Optional[Mapping[str, str]] = None) -> None:
274 self._loop = asyncio.new_event_loop()
275 self._async_api = NominatimAPIAsync(project_dir, environ)
278 def close(self) -> None:
279 """ Close all active connections to the database. The NominatimAPIAsync
280 object remains usable after closing. If a new API functions is
281 called, new connections are created.
283 self._loop.run_until_complete(self._async_api.close())
288 def config(self) -> Configuration:
289 """ Return the configuration used by the API.
291 return self._async_api.config
293 def status(self) -> StatusResult:
294 """ Return the status of the database.
296 return self._loop.run_until_complete(self._async_api.status())
299 def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
300 """ Get detailed information about a place in the database.
302 return self._loop.run_until_complete(self._async_api.details(place, **params))
305 def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
306 """ Get simple information about a list of places.
308 Returns a list of place information for all IDs that were found.
310 return self._loop.run_until_complete(self._async_api.lookup(places, **params))
313 def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
314 """ Find a place by its coordinates. Also known as reverse geocoding.
316 Returns the closest result that can be found or None if
317 no place matches the given criteria.
319 return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
322 def search(self, query: str, **params: Any) -> SearchResults:
323 """ Find a place by free-text search. Also known as forward geocoding.
325 return self._loop.run_until_complete(
326 self._async_api.search(query, **params))
329 # pylint: disable=too-many-arguments
330 def search_address(self, amenity: Optional[str] = None,
331 street: Optional[str] = None,
332 city: Optional[str] = None,
333 county: Optional[str] = None,
334 state: Optional[str] = None,
335 country: Optional[str] = None,
336 postalcode: Optional[str] = None,
337 **params: Any) -> SearchResults:
338 """ Find an address using structured search.
340 return self._loop.run_until_complete(
341 self._async_api.search_address(amenity, street, city, county,
342 state, country, postalcode, **params))
345 def search_category(self, categories: List[Tuple[str, str]],
346 near_query: Optional[str] = None,
347 **params: Any) -> SearchResults:
348 """ Find an object of a certain category near another place.
349 The near place may either be given as an unstructured search
350 query in itself or as a geographic area through the
351 viewbox or near parameters.
353 return self._loop.run_until_complete(
354 self._async_api.search_category(categories, near_query, **params))