1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence
13 from pathlib import Path
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.config import Configuration
21 from nominatim.api.connection import SearchConnection
22 from nominatim.api.status import get_status, StatusResult
23 from nominatim.api.lookup import get_detailed_place, get_simple_place
24 from nominatim.api.reverse import ReverseGeocoder
25 from nominatim.api.types import PlaceRef, LookupDetails, AnyPoint, DataLayer
26 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
29 class NominatimAPIAsync:
30 """ API loader asynchornous version.
32 def __init__(self, project_dir: Path,
33 environ: Optional[Mapping[str, str]] = None) -> None:
34 self.config = Configuration(project_dir, environ)
35 self.server_version = 0
37 self._engine_lock = asyncio.Lock()
38 self._engine: Optional[sa_asyncio.AsyncEngine] = None
39 self._tables: Optional[SearchTables] = None
40 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
43 async def setup_database(self) -> None:
44 """ Set up the engine and connection parameters.
46 This function will be implicitly called when the database is
47 accessed for the first time. You may also call it explicitly to
48 avoid that the first call is delayed by the setup.
50 async with self._engine_lock:
54 dsn = self.config.get_database_params()
56 query = {k: v for k, v in dsn.items()
57 if k not in ('user', 'password', 'dbname', 'host', 'port')}
58 query['prepared_statement_cache_size'] = '0'
60 dburl = sa.engine.URL.create(
62 database=dsn.get('dbname'),
63 username=dsn.get('user'), password=dsn.get('password'),
64 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
66 engine = sa_asyncio.create_async_engine(
68 connect_args={'server_settings': {
69 'DateStyle': 'sql,european',
70 'max_parallel_workers_per_gather': '0'
74 async with engine.begin() as conn:
75 result = await conn.scalar(sa.text('SHOW server_version_num'))
76 server_version = int(result)
77 except asyncpg.PostgresError:
80 if server_version >= 110000:
81 @sa.event.listens_for(engine.sync_engine, "connect")
82 def _on_connect(dbapi_con: Any, _: Any) -> None:
83 cursor = dbapi_con.cursor()
84 cursor.execute("SET jit_above_cost TO '-1'")
85 # Make sure that all connections get the new settings
88 self._property_cache['DB:server_version'] = server_version
90 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
94 async def close(self) -> None:
95 """ Close all active connections to the database. The NominatimAPIAsync
96 object remains usable after closing. If a new API functions is
97 called, new connections are created.
99 if self._engine is not None:
100 await self._engine.dispose()
103 @contextlib.asynccontextmanager
104 async def begin(self) -> AsyncIterator[SearchConnection]:
105 """ Create a new connection with automatic transaction handling.
107 This function may be used to get low-level access to the database.
108 Refer to the documentation of SQLAlchemy for details how to use
109 the connection object.
111 if self._engine is None:
112 await self.setup_database()
114 assert self._engine is not None
115 assert self._tables is not None
117 async with self._engine.begin() as conn:
118 yield SearchConnection(conn, self._tables, self._property_cache)
121 async def status(self) -> StatusResult:
122 """ Return the status of the database.
125 async with self.begin() as conn:
126 status = await get_status(conn)
127 except asyncpg.PostgresError:
128 return StatusResult(700, 'Database connection failed')
133 async def details(self, place: PlaceRef,
134 details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
135 """ Get detailed information about a place in the database.
137 Returns None if there is no entry under the given ID.
139 async with self.begin() as conn:
140 return await get_detailed_place(conn, place, details or LookupDetails())
143 async def lookup(self, places: Sequence[PlaceRef],
144 details: Optional[LookupDetails] = None) -> SearchResults:
145 """ Get simple information about a list of places.
147 Returns a list of place information for all IDs that were found.
150 details = LookupDetails()
151 async with self.begin() as conn:
152 return SearchResults(filter(None,
153 [await get_simple_place(conn, p, details) for p in places]))
156 async def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
157 layer: Optional[DataLayer] = None,
158 details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
159 """ Find a place by its coordinates. Also known as reverse geocoding.
161 Returns the closest result that can be found or None if
162 no place matches the given criteria.
164 # The following negation handles NaN correctly. Don't change.
165 if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
166 # There are no results to be expected outside valid coordinates.
170 layer = DataLayer.ADDRESS | DataLayer.POI
172 max_rank = max(0, min(max_rank or 30, 30))
174 async with self.begin() as conn:
175 geocoder = ReverseGeocoder(conn, max_rank, layer,
176 details or LookupDetails())
177 return await geocoder.lookup(coord)
181 """ API loader, synchronous version.
184 def __init__(self, project_dir: Path,
185 environ: Optional[Mapping[str, str]] = None) -> None:
186 self._loop = asyncio.new_event_loop()
187 self._async_api = NominatimAPIAsync(project_dir, environ)
190 def close(self) -> None:
191 """ Close all active connections to the database. The NominatimAPIAsync
192 object remains usable after closing. If a new API functions is
193 called, new connections are created.
195 self._loop.run_until_complete(self._async_api.close())
200 def config(self) -> Configuration:
201 """ Return the configuration used by the API.
203 return self._async_api.config
205 def status(self) -> StatusResult:
206 """ Return the status of the database.
208 return self._loop.run_until_complete(self._async_api.status())
211 def details(self, place: PlaceRef,
212 details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
213 """ Get detailed information about a place in the database.
215 return self._loop.run_until_complete(self._async_api.details(place, details))
218 def lookup(self, places: Sequence[PlaceRef],
219 details: Optional[LookupDetails] = None) -> SearchResults:
220 """ Get simple information about a list of places.
222 Returns a list of place information for all IDs that were found.
224 return self._loop.run_until_complete(self._async_api.lookup(places, details))
227 def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
228 layer: Optional[DataLayer] = None,
229 details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
230 """ Find a place by its coordinates. Also known as reverse geocoding.
232 Returns the closest result that can be found or None if
233 no place matches the given criteria.
235 return self._loop.run_until_complete(
236 self._async_api.reverse(coord, max_rank, layer, details))