1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
14 from pathlib import Path
16 import sqlalchemy as sa
17 import sqlalchemy.ext.asyncio as sa_asyncio
19 from nominatim.errors import UsageError
20 from nominatim.db.sqlalchemy_schema import SearchTables
21 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
22 from nominatim.config import Configuration
23 from nominatim.api.connection import SearchConnection
24 from nominatim.api.status import get_status, StatusResult
25 from nominatim.api.lookup import get_detailed_place, get_simple_place
26 from nominatim.api.reverse import ReverseGeocoder
27 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
28 import nominatim.api.types as ntyp
29 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
32 class NominatimAPIAsync:
33 """ API loader asynchornous version.
35 def __init__(self, project_dir: Path,
36 environ: Optional[Mapping[str, str]] = None,
37 loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
38 self.config = Configuration(project_dir, environ)
39 self.server_version = 0
41 if sys.version_info >= (3, 10):
42 self._engine_lock = asyncio.Lock()
44 self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
45 self._engine: Optional[sa_asyncio.AsyncEngine] = None
46 self._tables: Optional[SearchTables] = None
47 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
50 async def setup_database(self) -> None:
51 """ Set up the engine and connection parameters.
53 This function will be implicitly called when the database is
54 accessed for the first time. You may also call it explicitly to
55 avoid that the first call is delayed by the setup.
57 async with self._engine_lock:
61 dsn = self.config.get_database_params()
62 pool_size = self.config.get_int('API_POOL_SIZE')
64 query = {k: v for k, v in dsn.items()
65 if k not in ('user', 'password', 'dbname', 'host', 'port')}
67 dburl = sa.engine.URL.create(
68 f'postgresql+{PGCORE_LIB}',
69 database=dsn.get('dbname'),
70 username=dsn.get('user'), password=dsn.get('password'),
71 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
73 engine = sa_asyncio.create_async_engine(dburl, future=True,
74 max_overflow=0, pool_size=pool_size,
75 echo=self.config.get_bool('DEBUG_SQL'))
78 async with engine.begin() as conn:
79 result = await conn.scalar(sa.text('SHOW server_version_num'))
80 server_version = int(result)
81 except (PGCORE_ERROR, sa.exc.OperationalError):
84 if server_version >= 110000:
85 @sa.event.listens_for(engine.sync_engine, "connect")
86 def _on_connect(dbapi_con: Any, _: Any) -> None:
87 cursor = dbapi_con.cursor()
88 cursor.execute("SET jit_above_cost TO '-1'")
89 cursor.execute("SET max_parallel_workers_per_gather TO '0'")
90 # Make sure that all connections get the new settings
93 self._property_cache['DB:server_version'] = server_version
95 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
99 async def close(self) -> None:
100 """ Close all active connections to the database. The NominatimAPIAsync
101 object remains usable after closing. If a new API functions is
102 called, new connections are created.
104 if self._engine is not None:
105 await self._engine.dispose()
108 @contextlib.asynccontextmanager
109 async def begin(self) -> AsyncIterator[SearchConnection]:
110 """ Create a new connection with automatic transaction handling.
112 This function may be used to get low-level access to the database.
113 Refer to the documentation of SQLAlchemy for details how to use
114 the connection object.
116 if self._engine is None:
117 await self.setup_database()
119 assert self._engine is not None
120 assert self._tables is not None
122 async with self._engine.begin() as conn:
123 yield SearchConnection(conn, self._tables, self._property_cache)
126 async def status(self) -> StatusResult:
127 """ Return the status of the database.
130 async with self.begin() as conn:
131 status = await get_status(conn)
132 except (PGCORE_ERROR, sa.exc.OperationalError):
133 return StatusResult(700, 'Database connection failed')
138 async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
139 """ Get detailed information about a place in the database.
141 Returns None if there is no entry under the given ID.
143 details = ntyp.LookupDetails.from_kwargs(params)
144 async with self.begin() as conn:
146 await make_query_analyzer(conn)
147 return await get_detailed_place(conn, place, details)
150 async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
151 """ Get simple information about a list of places.
153 Returns a list of place information for all IDs that were found.
155 details = ntyp.LookupDetails.from_kwargs(params)
156 async with self.begin() as conn:
158 await make_query_analyzer(conn)
159 return SearchResults(filter(None,
160 [await get_simple_place(conn, p, details) for p in places]))
163 async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
164 """ Find a place by its coordinates. Also known as reverse geocoding.
166 Returns the closest result that can be found or None if
167 no place matches the given criteria.
169 # The following negation handles NaN correctly. Don't change.
170 if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
171 # There are no results to be expected outside valid coordinates.
174 details = ntyp.ReverseDetails.from_kwargs(params)
175 async with self.begin() as conn:
177 await make_query_analyzer(conn)
178 geocoder = ReverseGeocoder(conn, details)
179 return await geocoder.lookup(coord)
182 async def search(self, query: str, **params: Any) -> SearchResults:
183 """ Find a place by free-text search. Also known as forward geocoding.
185 query = query.strip()
187 raise UsageError('Nothing to search for.')
189 async with self.begin() as conn:
190 geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
191 phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
192 return await geocoder.lookup(phrases)
195 # pylint: disable=too-many-arguments,too-many-branches
196 async def search_address(self, amenity: Optional[str] = None,
197 street: Optional[str] = None,
198 city: Optional[str] = None,
199 county: Optional[str] = None,
200 state: Optional[str] = None,
201 country: Optional[str] = None,
202 postalcode: Optional[str] = None,
203 **params: Any) -> SearchResults:
204 """ Find an address using structured search.
206 async with self.begin() as conn:
207 details = ntyp.SearchDetails.from_kwargs(params)
209 phrases: List[Phrase] = []
212 phrases.append(Phrase(PhraseType.AMENITY, amenity))
214 phrases.append(Phrase(PhraseType.STREET, street))
216 phrases.append(Phrase(PhraseType.CITY, city))
218 phrases.append(Phrase(PhraseType.COUNTY, county))
220 phrases.append(Phrase(PhraseType.STATE, state))
222 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
224 phrases.append(Phrase(PhraseType.COUNTRY, country))
227 raise UsageError('Nothing to search for.')
229 if amenity or street:
230 details.restrict_min_max_rank(26, 30)
232 details.restrict_min_max_rank(13, 25)
234 details.restrict_min_max_rank(10, 12)
236 details.restrict_min_max_rank(5, 9)
238 details.restrict_min_max_rank(5, 11)
240 details.restrict_min_max_rank(4, 4)
242 if 'layers' not in params:
243 details.layers = ntyp.DataLayer.ADDRESS
245 details.layers |= ntyp.DataLayer.POI
247 geocoder = ForwardGeocoder(conn, details)
248 return await geocoder.lookup(phrases)
251 async def search_category(self, categories: List[Tuple[str, str]],
252 near_query: Optional[str] = None,
253 **params: Any) -> SearchResults:
254 """ Find an object of a certain category near another place.
255 The near place may either be given as an unstructured search
256 query in itself or as coordinates.
259 return SearchResults()
261 details = ntyp.SearchDetails.from_kwargs(params)
262 async with self.begin() as conn:
264 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
268 await make_query_analyzer(conn)
270 geocoder = ForwardGeocoder(conn, details)
271 return await geocoder.lookup_pois(categories, phrases)
276 """ API loader, synchronous version.
279 def __init__(self, project_dir: Path,
280 environ: Optional[Mapping[str, str]] = None) -> None:
281 self._loop = asyncio.new_event_loop()
282 self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
285 def close(self) -> None:
286 """ Close all active connections to the database. The NominatimAPIAsync
287 object remains usable after closing. If a new API functions is
288 called, new connections are created.
290 self._loop.run_until_complete(self._async_api.close())
295 def config(self) -> Configuration:
296 """ Return the configuration used by the API.
298 return self._async_api.config
300 def status(self) -> StatusResult:
301 """ Return the status of the database.
303 return self._loop.run_until_complete(self._async_api.status())
306 def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
307 """ Get detailed information about a place in the database.
309 return self._loop.run_until_complete(self._async_api.details(place, **params))
312 def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
313 """ Get simple information about a list of places.
315 Returns a list of place information for all IDs that were found.
317 return self._loop.run_until_complete(self._async_api.lookup(places, **params))
320 def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
321 """ Find a place by its coordinates. Also known as reverse geocoding.
323 Returns the closest result that can be found or None if
324 no place matches the given criteria.
326 return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
329 def search(self, query: str, **params: Any) -> SearchResults:
330 """ Find a place by free-text search. Also known as forward geocoding.
332 return self._loop.run_until_complete(
333 self._async_api.search(query, **params))
336 # pylint: disable=too-many-arguments
337 def search_address(self, amenity: Optional[str] = None,
338 street: Optional[str] = None,
339 city: Optional[str] = None,
340 county: Optional[str] = None,
341 state: Optional[str] = None,
342 country: Optional[str] = None,
343 postalcode: Optional[str] = None,
344 **params: Any) -> SearchResults:
345 """ Find an address using structured search.
347 return self._loop.run_until_complete(
348 self._async_api.search_address(amenity, street, city, county,
349 state, country, postalcode, **params))
352 def search_category(self, categories: List[Tuple[str, str]],
353 near_query: Optional[str] = None,
354 **params: Any) -> SearchResults:
355 """ Find an object of a certain category near another place.
356 The near place may either be given as an unstructured search
357 query in itself or as a geographic area through the
358 viewbox or near parameters.
360 return self._loop.run_until_complete(
361 self._async_api.search_category(categories, near_query, **params))