1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
13 from pathlib import Path
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
18 from nominatim.errors import UsageError
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
21 from nominatim.config import Configuration
22 from nominatim.api.connection import SearchConnection
23 from nominatim.api.status import get_status, StatusResult
24 from nominatim.api.lookup import get_detailed_place, get_simple_place
25 from nominatim.api.reverse import ReverseGeocoder
26 from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
27 import nominatim.api.types as ntyp
28 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
31 class NominatimAPIAsync:
32 """ API loader asynchornous version.
34 def __init__(self, project_dir: Path,
35 environ: Optional[Mapping[str, str]] = None) -> None:
36 self.config = Configuration(project_dir, environ)
37 self.server_version = 0
39 self._engine_lock = asyncio.Lock()
40 self._engine: Optional[sa_asyncio.AsyncEngine] = None
41 self._tables: Optional[SearchTables] = None
42 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
45 async def setup_database(self) -> None:
46 """ Set up the engine and connection parameters.
48 This function will be implicitly called when the database is
49 accessed for the first time. You may also call it explicitly to
50 avoid that the first call is delayed by the setup.
52 async with self._engine_lock:
56 dsn = self.config.get_database_params()
58 query = {k: v for k, v in dsn.items()
59 if k not in ('user', 'password', 'dbname', 'host', 'port')}
60 if PGCORE_LIB == 'asyncpg':
61 query['prepared_statement_cache_size'] = '0'
63 dburl = sa.engine.URL.create(
64 f'postgresql+{PGCORE_LIB}',
65 database=dsn.get('dbname'),
66 username=dsn.get('user'), password=dsn.get('password'),
67 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
69 engine = sa_asyncio.create_async_engine(dburl, future=True)
72 async with engine.begin() as conn:
73 result = await conn.scalar(sa.text('SHOW server_version_num'))
74 server_version = int(result)
75 except (PGCORE_ERROR, sa.exc.OperationalError):
78 if server_version >= 110000:
79 @sa.event.listens_for(engine.sync_engine, "connect")
80 def _on_connect(dbapi_con: Any, _: Any) -> None:
81 cursor = dbapi_con.cursor()
82 cursor.execute("SET jit_above_cost TO '-1'")
83 cursor.execute("SET max_parallel_workers_per_gather TO '0'")
84 # Make sure that all connections get the new settings
87 self._property_cache['DB:server_version'] = server_version
89 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
93 async def close(self) -> None:
94 """ Close all active connections to the database. The NominatimAPIAsync
95 object remains usable after closing. If a new API functions is
96 called, new connections are created.
98 if self._engine is not None:
99 await self._engine.dispose()
102 @contextlib.asynccontextmanager
103 async def begin(self) -> AsyncIterator[SearchConnection]:
104 """ Create a new connection with automatic transaction handling.
106 This function may be used to get low-level access to the database.
107 Refer to the documentation of SQLAlchemy for details how to use
108 the connection object.
110 if self._engine is None:
111 await self.setup_database()
113 assert self._engine is not None
114 assert self._tables is not None
116 async with self._engine.begin() as conn:
117 yield SearchConnection(conn, self._tables, self._property_cache)
120 async def status(self) -> StatusResult:
121 """ Return the status of the database.
124 async with self.begin() as conn:
125 status = await get_status(conn)
126 except (PGCORE_ERROR, sa.exc.OperationalError):
127 return StatusResult(700, 'Database connection failed')
132 async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
133 """ Get detailed information about a place in the database.
135 Returns None if there is no entry under the given ID.
137 details = ntyp.LookupDetails.from_kwargs(params)
138 async with self.begin() as conn:
140 await make_query_analyzer(conn)
141 return await get_detailed_place(conn, place, details)
144 async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
145 """ Get simple information about a list of places.
147 Returns a list of place information for all IDs that were found.
149 details = ntyp.LookupDetails.from_kwargs(params)
150 async with self.begin() as conn:
152 await make_query_analyzer(conn)
153 return SearchResults(filter(None,
154 [await get_simple_place(conn, p, details) for p in places]))
157 async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
158 """ Find a place by its coordinates. Also known as reverse geocoding.
160 Returns the closest result that can be found or None if
161 no place matches the given criteria.
163 # The following negation handles NaN correctly. Don't change.
164 if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
165 # There are no results to be expected outside valid coordinates.
168 details = ntyp.ReverseDetails.from_kwargs(params)
169 async with self.begin() as conn:
171 await make_query_analyzer(conn)
172 geocoder = ReverseGeocoder(conn, details)
173 return await geocoder.lookup(coord)
176 async def search(self, query: str, **params: Any) -> SearchResults:
177 """ Find a place by free-text search. Also known as forward geocoding.
179 query = query.strip()
181 raise UsageError('Nothing to search for.')
183 async with self.begin() as conn:
184 geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
185 phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
186 return await geocoder.lookup(phrases)
189 # pylint: disable=too-many-arguments,too-many-branches
190 async def search_address(self, amenity: Optional[str] = None,
191 street: Optional[str] = None,
192 city: Optional[str] = None,
193 county: Optional[str] = None,
194 state: Optional[str] = None,
195 country: Optional[str] = None,
196 postalcode: Optional[str] = None,
197 **params: Any) -> SearchResults:
198 """ Find an address using structured search.
200 async with self.begin() as conn:
201 details = ntyp.SearchDetails.from_kwargs(params)
203 phrases: List[Phrase] = []
206 phrases.append(Phrase(PhraseType.AMENITY, amenity))
208 phrases.append(Phrase(PhraseType.STREET, street))
210 phrases.append(Phrase(PhraseType.CITY, city))
212 phrases.append(Phrase(PhraseType.COUNTY, county))
214 phrases.append(Phrase(PhraseType.STATE, state))
216 phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
218 phrases.append(Phrase(PhraseType.COUNTRY, country))
221 raise UsageError('Nothing to search for.')
223 if amenity or street:
224 details.restrict_min_max_rank(26, 30)
226 details.restrict_min_max_rank(13, 25)
228 details.restrict_min_max_rank(10, 12)
230 details.restrict_min_max_rank(5, 9)
232 details.restrict_min_max_rank(5, 11)
234 details.restrict_min_max_rank(4, 4)
236 if 'layers' not in params:
237 details.layers = ntyp.DataLayer.ADDRESS
239 details.layers |= ntyp.DataLayer.POI
241 geocoder = ForwardGeocoder(conn, details)
242 return await geocoder.lookup(phrases)
245 async def search_category(self, categories: List[Tuple[str, str]],
246 near_query: Optional[str] = None,
247 **params: Any) -> SearchResults:
248 """ Find an object of a certain category near another place.
249 The near place may either be given as an unstructured search
250 query in itself or as coordinates.
253 return SearchResults()
255 details = ntyp.SearchDetails.from_kwargs(params)
256 async with self.begin() as conn:
258 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
262 await make_query_analyzer(conn)
264 geocoder = ForwardGeocoder(conn, details)
265 return await geocoder.lookup_pois(categories, phrases)
270 """ API loader, synchronous version.
273 def __init__(self, project_dir: Path,
274 environ: Optional[Mapping[str, str]] = None) -> None:
275 self._loop = asyncio.new_event_loop()
276 self._async_api = NominatimAPIAsync(project_dir, environ)
279 def close(self) -> None:
280 """ Close all active connections to the database. The NominatimAPIAsync
281 object remains usable after closing. If a new API functions is
282 called, new connections are created.
284 self._loop.run_until_complete(self._async_api.close())
289 def config(self) -> Configuration:
290 """ Return the configuration used by the API.
292 return self._async_api.config
294 def status(self) -> StatusResult:
295 """ Return the status of the database.
297 return self._loop.run_until_complete(self._async_api.status())
300 def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
301 """ Get detailed information about a place in the database.
303 return self._loop.run_until_complete(self._async_api.details(place, **params))
306 def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
307 """ Get simple information about a list of places.
309 Returns a list of place information for all IDs that were found.
311 return self._loop.run_until_complete(self._async_api.lookup(places, **params))
314 def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
315 """ Find a place by its coordinates. Also known as reverse geocoding.
317 Returns the closest result that can be found or None if
318 no place matches the given criteria.
320 return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
323 def search(self, query: str, **params: Any) -> SearchResults:
324 """ Find a place by free-text search. Also known as forward geocoding.
326 return self._loop.run_until_complete(
327 self._async_api.search(query, **params))
330 # pylint: disable=too-many-arguments
331 def search_address(self, amenity: Optional[str] = None,
332 street: Optional[str] = None,
333 city: Optional[str] = None,
334 county: Optional[str] = None,
335 state: Optional[str] = None,
336 country: Optional[str] = None,
337 postalcode: Optional[str] = None,
338 **params: Any) -> SearchResults:
339 """ Find an address using structured search.
341 return self._loop.run_until_complete(
342 self._async_api.search_address(amenity, street, city, county,
343 state, country, postalcode, **params))
346 def search_category(self, categories: List[Tuple[str, str]],
347 near_query: Optional[str] = None,
348 **params: Any) -> SearchResults:
349 """ Find an object of a certain category near another place.
350 The near place may either be given as an unstructured search
351 query in itself or as a geographic area through the
352 viewbox or near parameters.
354 return self._loop.run_until_complete(
355 self._async_api.search_category(categories, near_query, **params))