1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict
13 from pathlib import Path
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.config import Configuration
21 from nominatim.api.connection import SearchConnection
22 from nominatim.api.status import get_status, StatusResult
23 from nominatim.api.lookup import get_place_by_id
24 from nominatim.api.reverse import ReverseGeocoder
25 from nominatim.api.types import PlaceRef, LookupDetails, AnyPoint, DataLayer
26 from nominatim.api.results import DetailedResult, ReverseResult
29 class NominatimAPIAsync:
30 """ API loader asynchornous version.
32 def __init__(self, project_dir: Path,
33 environ: Optional[Mapping[str, str]] = None) -> None:
34 self.config = Configuration(project_dir, environ)
35 self.server_version = 0
37 self._engine_lock = asyncio.Lock()
38 self._engine: Optional[sa_asyncio.AsyncEngine] = None
39 self._tables: Optional[SearchTables] = None
40 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
43 async def setup_database(self) -> None:
44 """ Set up the engine and connection parameters.
46 This function will be implicitly called when the database is
47 accessed for the first time. You may also call it explicitly to
48 avoid that the first call is delayed by the setup.
50 async with self._engine_lock:
54 dsn = self.config.get_database_params()
56 query = {k: v for k, v in dsn.items()
57 if k not in ('user', 'password', 'dbname', 'host', 'port')}
58 query['prepared_statement_cache_size'] = '0'
60 dburl = sa.engine.URL.create(
62 database=dsn.get('dbname'),
63 username=dsn.get('user'), password=dsn.get('password'),
64 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
66 engine = sa_asyncio.create_async_engine(
68 connect_args={'server_settings': {
69 'DateStyle': 'sql,european',
70 'max_parallel_workers_per_gather': '0'
74 async with engine.begin() as conn:
75 result = await conn.scalar(sa.text('SHOW server_version_num'))
76 server_version = int(result)
77 except asyncpg.PostgresError:
80 if server_version >= 110000:
81 @sa.event.listens_for(engine.sync_engine, "connect")
82 def _on_connect(dbapi_con: Any, _: Any) -> None:
83 cursor = dbapi_con.cursor()
84 cursor.execute("SET jit_above_cost TO '-1'")
85 # Make sure that all connections get the new settings
88 self._property_cache['DB:server_version'] = server_version
90 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
94 async def close(self) -> None:
95 """ Close all active connections to the database. The NominatimAPIAsync
96 object remains usable after closing. If a new API functions is
97 called, new connections are created.
99 if self._engine is not None:
100 await self._engine.dispose()
103 @contextlib.asynccontextmanager
104 async def begin(self) -> AsyncIterator[SearchConnection]:
105 """ Create a new connection with automatic transaction handling.
107 This function may be used to get low-level access to the database.
108 Refer to the documentation of SQLAlchemy for details how to use
109 the connection object.
111 if self._engine is None:
112 await self.setup_database()
114 assert self._engine is not None
115 assert self._tables is not None
117 async with self._engine.begin() as conn:
118 yield SearchConnection(conn, self._tables, self._property_cache)
121 async def status(self) -> StatusResult:
122 """ Return the status of the database.
125 async with self.begin() as conn:
126 status = await get_status(conn)
127 except asyncpg.PostgresError:
128 return StatusResult(700, 'Database connection failed')
133 async def lookup(self, place: PlaceRef,
134 details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
135 """ Get detailed information about a place in the database.
137 Returns None if there is no entry under the given ID.
139 async with self.begin() as conn:
140 return await get_place_by_id(conn, place, details or LookupDetails())
143 async def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
144 layer: Optional[DataLayer] = None,
145 details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
146 """ Find a place by its coordinates. Also known as reverse geocoding.
148 Returns the closest result that can be found or None if
149 no place matches the given criteria.
151 # The following negation handles NaN correctly. Don't change.
152 if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
153 # There are no results to be expected outside valid coordinates.
157 layer = DataLayer.ADDRESS | DataLayer.POI
159 max_rank = max(0, min(max_rank or 30, 30))
161 async with self.begin() as conn:
162 geocoder = ReverseGeocoder(conn, max_rank, layer,
163 details or LookupDetails())
164 return await geocoder.lookup(coord)
168 """ API loader, synchronous version.
171 def __init__(self, project_dir: Path,
172 environ: Optional[Mapping[str, str]] = None) -> None:
173 self._loop = asyncio.new_event_loop()
174 self._async_api = NominatimAPIAsync(project_dir, environ)
177 def close(self) -> None:
178 """ Close all active connections to the database. The NominatimAPIAsync
179 object remains usable after closing. If a new API functions is
180 called, new connections are created.
182 self._loop.run_until_complete(self._async_api.close())
187 def config(self) -> Configuration:
188 """ Return the configuration used by the API.
190 return self._async_api.config
192 def status(self) -> StatusResult:
193 """ Return the status of the database.
195 return self._loop.run_until_complete(self._async_api.status())
198 def lookup(self, place: PlaceRef,
199 details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
200 """ Get detailed information about a place in the database.
202 return self._loop.run_until_complete(self._async_api.lookup(place, details))
205 def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
206 layer: Optional[DataLayer] = None,
207 details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
208 """ Find a place by its coordinates. Also known as reverse geocoding.
210 Returns the closest result that can be found or None if
211 no place matches the given criteria.
213 return self._loop.run_until_complete(
214 self._async_api.reverse(coord, max_rank, layer, details))