1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict
13 from pathlib import Path
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.config import Configuration
21 from nominatim.api.connection import SearchConnection
22 from nominatim.api.status import get_status, StatusResult
23 from nominatim.api.lookup import get_place_by_id
24 from nominatim.api.reverse import ReverseGeocoder
25 from nominatim.api.types import PlaceRef, LookupDetails, AnyPoint, DataLayer
26 from nominatim.api.results import DetailedResult, ReverseResult
29 class NominatimAPIAsync:
30 """ API loader asynchornous version.
32 def __init__(self, project_dir: Path,
33 environ: Optional[Mapping[str, str]] = None) -> None:
34 self.config = Configuration(project_dir, environ)
35 self.server_version = 0
37 self._engine_lock = asyncio.Lock()
38 self._engine: Optional[sa_asyncio.AsyncEngine] = None
39 self._tables: Optional[SearchTables] = None
40 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
43 async def setup_database(self) -> None:
44 """ Set up the engine and connection parameters.
46 This function will be implicitly called when the database is
47 accessed for the first time. You may also call it explicitly to
48 avoid that the first call is delayed by the setup.
50 async with self._engine_lock:
54 dsn = self.config.get_database_params()
56 dburl = sa.engine.URL.create(
58 database=dsn.get('dbname'),
59 username=dsn.get('user'), password=dsn.get('password'),
60 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
61 query={k: v for k, v in dsn.items()
62 if k not in ('user', 'password', 'dbname', 'host', 'port')})
63 engine = sa_asyncio.create_async_engine(
65 connect_args={'server_settings': {
66 'DateStyle': 'sql,european',
67 'max_parallel_workers_per_gather': '0'
71 async with engine.begin() as conn:
72 result = await conn.scalar(sa.text('SHOW server_version_num'))
73 server_version = int(result)
74 except asyncpg.PostgresError:
77 if server_version >= 110000:
78 @sa.event.listens_for(engine.sync_engine, "connect")
79 def _on_connect(dbapi_con: Any, _: Any) -> None:
80 cursor = dbapi_con.cursor()
81 cursor.execute("SET jit_above_cost TO '-1'")
82 # Make sure that all connections get the new settings
85 self._property_cache['DB:server_version'] = server_version
87 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
91 async def close(self) -> None:
92 """ Close all active connections to the database. The NominatimAPIAsync
93 object remains usable after closing. If a new API functions is
94 called, new connections are created.
96 if self._engine is not None:
97 await self._engine.dispose()
100 @contextlib.asynccontextmanager
101 async def begin(self) -> AsyncIterator[SearchConnection]:
102 """ Create a new connection with automatic transaction handling.
104 This function may be used to get low-level access to the database.
105 Refer to the documentation of SQLAlchemy for details how to use
106 the connection object.
108 if self._engine is None:
109 await self.setup_database()
111 assert self._engine is not None
112 assert self._tables is not None
114 async with self._engine.begin() as conn:
115 yield SearchConnection(conn, self._tables, self._property_cache)
118 async def status(self) -> StatusResult:
119 """ Return the status of the database.
122 async with self.begin() as conn:
123 status = await get_status(conn)
124 except asyncpg.PostgresError:
125 return StatusResult(700, 'Database connection failed')
130 async def lookup(self, place: PlaceRef,
131 details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
132 """ Get detailed information about a place in the database.
134 Returns None if there is no entry under the given ID.
136 async with self.begin() as conn:
137 return await get_place_by_id(conn, place, details or LookupDetails())
140 async def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
141 layer: Optional[DataLayer] = None,
142 details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
143 """ Find a place by its coordinates. Also known as reverse geocoding.
145 Returns the closest result that can be found or None if
146 no place matches the given criteria.
148 # The following negation handles NaN correctly. Don't change.
149 if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
150 # There are no results to be expected outside valid coordinates.
154 layer = DataLayer.ADDRESS | DataLayer.POI
156 max_rank = max(0, min(max_rank or 30, 30))
158 async with self.begin() as conn:
159 geocoder = ReverseGeocoder(conn, max_rank, layer,
160 details or LookupDetails())
161 return await geocoder.lookup(coord)
165 """ API loader, synchronous version.
168 def __init__(self, project_dir: Path,
169 environ: Optional[Mapping[str, str]] = None) -> None:
170 self._loop = asyncio.new_event_loop()
171 self._async_api = NominatimAPIAsync(project_dir, environ)
174 def close(self) -> None:
175 """ Close all active connections to the database. The NominatimAPIAsync
176 object remains usable after closing. If a new API functions is
177 called, new connections are created.
179 self._loop.run_until_complete(self._async_api.close())
184 def config(self) -> Configuration:
185 """ Return the configuration used by the API.
187 return self._async_api.config
189 def status(self) -> StatusResult:
190 """ Return the status of the database.
192 return self._loop.run_until_complete(self._async_api.status())
195 def lookup(self, place: PlaceRef,
196 details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
197 """ Get detailed information about a place in the database.
199 return self._loop.run_until_complete(self._async_api.lookup(place, details))
202 def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
203 layer: Optional[DataLayer] = None,
204 details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
205 """ Find a place by its coordinates. Also known as reverse geocoding.
207 Returns the closest result that can be found or None if
208 no place matches the given criteria.
210 return self._loop.run_until_complete(
211 self._async_api.reverse(coord, max_rank, layer, details))