1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
14 from pathlib import Path
16 import sqlalchemy as sa
17 import sqlalchemy.ext.asyncio as sa_asyncio
19 from nominatim.errors import UsageError
20 from nominatim.db.sqlalchemy_schema import SearchTables
21 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
22 from nominatim.config import Configuration
23 from nominatim.api.connection import SearchConnection
24 from nominatim.api.status import get_status, StatusResult
25 from nominatim.api.lookup import get_detailed_place, get_simple_place
26 from nominatim.api.reverse import ReverseGeocoder
27 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
28 import nominatim.api.types as ntyp
29 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
32 class NominatimAPIAsync:
33 """ API loader asynchornous version.
35 def __init__(self, project_dir: Path,
36 environ: Optional[Mapping[str, str]] = None,
37 loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
38 self.config = Configuration(project_dir, environ)
39 self.query_timeout = self.config.get_int('QUERY_TIMEOUT') \
40 if self.config.QUERY_TIMEOUT else None
41 self.server_version = 0
43 if sys.version_info >= (3, 10):
44 self._engine_lock = asyncio.Lock()
46 self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
47 self._engine: Optional[sa_asyncio.AsyncEngine] = None
48 self._tables: Optional[SearchTables] = None
49 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
52 async def setup_database(self) -> None:
53 """ Set up the engine and connection parameters.
55 This function will be implicitly called when the database is
56 accessed for the first time. You may also call it explicitly to
57 avoid that the first call is delayed by the setup.
59 async with self._engine_lock:
63 dsn = self.config.get_database_params()
64 pool_size = self.config.get_int('API_POOL_SIZE')
66 query = {k: v for k, v in dsn.items()
67 if k not in ('user', 'password', 'dbname', 'host', 'port')}
69 dburl = sa.engine.URL.create(
70 f'postgresql+{PGCORE_LIB}',
71 database=dsn.get('dbname'),
72 username=dsn.get('user'), password=dsn.get('password'),
73 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
75 engine = sa_asyncio.create_async_engine(dburl, future=True,
76 max_overflow=0, pool_size=pool_size,
77 echo=self.config.get_bool('DEBUG_SQL'))
80 async with engine.begin() as conn:
81 result = await conn.scalar(sa.text('SHOW server_version_num'))
82 server_version = int(result)
83 except (PGCORE_ERROR, sa.exc.OperationalError):
86 if server_version >= 110000:
87 @sa.event.listens_for(engine.sync_engine, "connect")
88 def _on_connect(dbapi_con: Any, _: Any) -> None:
89 cursor = dbapi_con.cursor()
90 cursor.execute("SET jit_above_cost TO '-1'")
91 cursor.execute("SET max_parallel_workers_per_gather TO '0'")
92 # Make sure that all connections get the new settings
95 self._property_cache['DB:server_version'] = server_version
97 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
101 async def close(self) -> None:
102 """ Close all active connections to the database. The NominatimAPIAsync
103 object remains usable after closing. If a new API functions is
104 called, new connections are created.
106 if self._engine is not None:
107 await self._engine.dispose()
110 @contextlib.asynccontextmanager
111 async def begin(self) -> AsyncIterator[SearchConnection]:
112 """ Create a new connection with automatic transaction handling.
114 This function may be used to get low-level access to the database.
115 Refer to the documentation of SQLAlchemy for details how to use
116 the connection object.
118 if self._engine is None:
119 await self.setup_database()
121 assert self._engine is not None
122 assert self._tables is not None
124 async with self._engine.begin() as conn:
125 yield SearchConnection(conn, self._tables, self._property_cache)
128 async def status(self) -> StatusResult:
129 """ Return the status of the database.
132 async with self.begin() as conn:
133 conn.set_query_timeout(self.query_timeout)
134 status = await get_status(conn)
135 except (PGCORE_ERROR, sa.exc.OperationalError):
136 return StatusResult(700, 'Database connection failed')
141 async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
142 """ Get detailed information about a place in the database.
144 Returns None if there is no entry under the given ID.
146 details = ntyp.LookupDetails.from_kwargs(params)
147 async with self.begin() as conn:
148 conn.set_query_timeout(self.query_timeout)
150 await make_query_analyzer(conn)
151 return await get_detailed_place(conn, place, details)
154 async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
155 """ Get simple information about a list of places.
157 Returns a list of place information for all IDs that were found.
159 details = ntyp.LookupDetails.from_kwargs(params)
160 async with self.begin() as conn:
161 conn.set_query_timeout(self.query_timeout)
163 await make_query_analyzer(conn)
164 return SearchResults(filter(None,
165 [await get_simple_place(conn, p, details) for p in places]))
168 async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
169 """ Find a place by its coordinates. Also known as reverse geocoding.
171 Returns the closest result that can be found or None if
172 no place matches the given criteria.
174 # The following negation handles NaN correctly. Don't change.
175 if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
176 # There are no results to be expected outside valid coordinates.
179 details = ntyp.ReverseDetails.from_kwargs(params)
180 async with self.begin() as conn:
181 conn.set_query_timeout(self.query_timeout)
183 await make_query_analyzer(conn)
184 geocoder = ReverseGeocoder(conn, details)
185 return await geocoder.lookup(coord)
188 async def search(self, query: str, **params: Any) -> SearchResults:
189 """ Find a place by free-text search. Also known as forward geocoding.
191 query = query.strip()
193 raise UsageError('Nothing to search for.')
195 async with self.begin() as conn:
196 conn.set_query_timeout(self.query_timeout)
197 geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
198 phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
199 return await geocoder.lookup(phrases)
202 # pylint: disable=too-many-arguments,too-many-branches
203 async def search_address(self, amenity: Optional[str] = None,
204 street: Optional[str] = None,
205 city: Optional[str] = None,
206 county: Optional[str] = None,
207 state: Optional[str] = None,
208 country: Optional[str] = None,
209 postalcode: Optional[str] = None,
210 **params: Any) -> SearchResults:
211 """ Find an address using structured search.
213 async with self.begin() as conn:
214 conn.set_query_timeout(self.query_timeout)
215 details = ntyp.SearchDetails.from_kwargs(params)
217 phrases: List[Phrase] = []
220 phrases.append(Phrase(PhraseType.AMENITY, amenity))
222 phrases.append(Phrase(PhraseType.STREET, street))
224 phrases.append(Phrase(PhraseType.CITY, city))
226 phrases.append(Phrase(PhraseType.COUNTY, county))
228 phrases.append(Phrase(PhraseType.STATE, state))
230 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
232 phrases.append(Phrase(PhraseType.COUNTRY, country))
235 raise UsageError('Nothing to search for.')
237 if amenity or street:
238 details.restrict_min_max_rank(26, 30)
240 details.restrict_min_max_rank(13, 25)
242 details.restrict_min_max_rank(10, 12)
244 details.restrict_min_max_rank(5, 9)
246 details.restrict_min_max_rank(5, 11)
248 details.restrict_min_max_rank(4, 4)
250 if 'layers' not in params:
251 details.layers = ntyp.DataLayer.ADDRESS
253 details.layers |= ntyp.DataLayer.POI
255 geocoder = ForwardGeocoder(conn, details)
256 return await geocoder.lookup(phrases)
259 async def search_category(self, categories: List[Tuple[str, str]],
260 near_query: Optional[str] = None,
261 **params: Any) -> SearchResults:
262 """ Find an object of a certain category near another place.
263 The near place may either be given as an unstructured search
264 query in itself or as coordinates.
267 return SearchResults()
269 details = ntyp.SearchDetails.from_kwargs(params)
270 async with self.begin() as conn:
271 conn.set_query_timeout(self.query_timeout)
273 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
277 await make_query_analyzer(conn)
279 geocoder = ForwardGeocoder(conn, details)
280 return await geocoder.lookup_pois(categories, phrases)
285 """ API loader, synchronous version.
288 def __init__(self, project_dir: Path,
289 environ: Optional[Mapping[str, str]] = None) -> None:
290 self._loop = asyncio.new_event_loop()
291 self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
294 def close(self) -> None:
295 """ Close all active connections to the database. The NominatimAPIAsync
296 object remains usable after closing. If a new API functions is
297 called, new connections are created.
299 self._loop.run_until_complete(self._async_api.close())
304 def config(self) -> Configuration:
305 """ Return the configuration used by the API.
307 return self._async_api.config
309 def status(self) -> StatusResult:
310 """ Return the status of the database.
312 return self._loop.run_until_complete(self._async_api.status())
315 def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
316 """ Get detailed information about a place in the database.
318 return self._loop.run_until_complete(self._async_api.details(place, **params))
321 def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
322 """ Get simple information about a list of places.
324 Returns a list of place information for all IDs that were found.
326 return self._loop.run_until_complete(self._async_api.lookup(places, **params))
329 def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
330 """ Find a place by its coordinates. Also known as reverse geocoding.
332 Returns the closest result that can be found or None if
333 no place matches the given criteria.
335 return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
338 def search(self, query: str, **params: Any) -> SearchResults:
339 """ Find a place by free-text search. Also known as forward geocoding.
341 return self._loop.run_until_complete(
342 self._async_api.search(query, **params))
345 # pylint: disable=too-many-arguments
346 def search_address(self, amenity: Optional[str] = None,
347 street: Optional[str] = None,
348 city: Optional[str] = None,
349 county: Optional[str] = None,
350 state: Optional[str] = None,
351 country: Optional[str] = None,
352 postalcode: Optional[str] = None,
353 **params: Any) -> SearchResults:
354 """ Find an address using structured search.
356 return self._loop.run_until_complete(
357 self._async_api.search_address(amenity, street, city, county,
358 state, country, postalcode, **params))
361 def search_category(self, categories: List[Tuple[str, str]],
362 near_query: Optional[str] = None,
363 **params: Any) -> SearchResults:
364 """ Find an object of a certain category near another place.
365 The near place may either be given as an unstructured search
366 query in itself or as a geographic area through the
367 viewbox or near parameters.
369 return self._loop.run_until_complete(
370 self._async_api.search_category(categories, near_query, **params))