]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/core.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / nominatim / api / core.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of classes for API access via libraries.
9 """
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence
11 import asyncio
12 import contextlib
13 from pathlib import Path
14
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
17 import asyncpg
18
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.config import Configuration
21 from nominatim.api.connection import SearchConnection
22 from nominatim.api.status import get_status, StatusResult
23 from nominatim.api.lookup import get_detailed_place, get_simple_place
24 from nominatim.api.reverse import ReverseGeocoder
25 from nominatim.api.types import PlaceRef, LookupDetails, AnyPoint, DataLayer
26 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
27
28
29 class NominatimAPIAsync:
30     """ API loader asynchornous version.
31     """
32     def __init__(self, project_dir: Path,
33                  environ: Optional[Mapping[str, str]] = None) -> None:
34         self.config = Configuration(project_dir, environ)
35         self.server_version = 0
36
37         self._engine_lock = asyncio.Lock()
38         self._engine: Optional[sa_asyncio.AsyncEngine] = None
39         self._tables: Optional[SearchTables] = None
40         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
41
42
43     async def setup_database(self) -> None:
44         """ Set up the engine and connection parameters.
45
46             This function will be implicitly called when the database is
47             accessed for the first time. You may also call it explicitly to
48             avoid that the first call is delayed by the setup.
49         """
50         async with self._engine_lock:
51             if self._engine:
52                 return
53
54             dsn = self.config.get_database_params()
55
56             query = {k: v for k, v in dsn.items()
57                       if k not in ('user', 'password', 'dbname', 'host', 'port')}
58             query['prepared_statement_cache_size'] = '0'
59
60             dburl = sa.engine.URL.create(
61                        'postgresql+asyncpg',
62                        database=dsn.get('dbname'),
63                        username=dsn.get('user'), password=dsn.get('password'),
64                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
65                        query=query)
66             engine = sa_asyncio.create_async_engine(
67                              dburl, future=True,
68                              connect_args={'server_settings': {
69                                 'DateStyle': 'sql,european',
70                                 'max_parallel_workers_per_gather': '0'
71                              }})
72
73             try:
74                 async with engine.begin() as conn:
75                     result = await conn.scalar(sa.text('SHOW server_version_num'))
76                     server_version = int(result)
77             except asyncpg.PostgresError:
78                 server_version = 0
79
80             if server_version >= 110000:
81                 @sa.event.listens_for(engine.sync_engine, "connect")
82                 def _on_connect(dbapi_con: Any, _: Any) -> None:
83                     cursor = dbapi_con.cursor()
84                     cursor.execute("SET jit_above_cost TO '-1'")
85                 # Make sure that all connections get the new settings
86                 await self.close()
87
88             self._property_cache['DB:server_version'] = server_version
89
90             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
91             self._engine = engine
92
93
94     async def close(self) -> None:
95         """ Close all active connections to the database. The NominatimAPIAsync
96             object remains usable after closing. If a new API functions is
97             called, new connections are created.
98         """
99         if self._engine is not None:
100             await self._engine.dispose()
101
102
103     @contextlib.asynccontextmanager
104     async def begin(self) -> AsyncIterator[SearchConnection]:
105         """ Create a new connection with automatic transaction handling.
106
107             This function may be used to get low-level access to the database.
108             Refer to the documentation of SQLAlchemy for details how to use
109             the connection object.
110         """
111         if self._engine is None:
112             await self.setup_database()
113
114         assert self._engine is not None
115         assert self._tables is not None
116
117         async with self._engine.begin() as conn:
118             yield SearchConnection(conn, self._tables, self._property_cache)
119
120
121     async def status(self) -> StatusResult:
122         """ Return the status of the database.
123         """
124         try:
125             async with self.begin() as conn:
126                 status = await get_status(conn)
127         except asyncpg.PostgresError:
128             return StatusResult(700, 'Database connection failed')
129
130         return status
131
132
133     async def details(self, place: PlaceRef,
134                       details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
135         """ Get detailed information about a place in the database.
136
137             Returns None if there is no entry under the given ID.
138         """
139         async with self.begin() as conn:
140             return await get_detailed_place(conn, place, details or LookupDetails())
141
142
143     async def lookup(self, places: Sequence[PlaceRef],
144                       details: Optional[LookupDetails] = None) -> SearchResults:
145         """ Get simple information about a list of places.
146
147             Returns a list of place information for all IDs that were found.
148         """
149         if details is None:
150             details = LookupDetails()
151         async with self.begin() as conn:
152             return SearchResults(filter(None,
153                                         [await get_simple_place(conn, p, details) for p in places]))
154
155
156     async def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
157                       layer: Optional[DataLayer] = None,
158                       details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
159         """ Find a place by its coordinates. Also known as reverse geocoding.
160
161             Returns the closest result that can be found or None if
162             no place matches the given criteria.
163         """
164         # The following negation handles NaN correctly. Don't change.
165         if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
166             # There are no results to be expected outside valid coordinates.
167             return None
168
169         if layer is None:
170             layer = DataLayer.ADDRESS | DataLayer.POI
171
172         max_rank = max(0, min(max_rank or 30, 30))
173
174         async with self.begin() as conn:
175             geocoder = ReverseGeocoder(conn, max_rank, layer,
176                                        details or LookupDetails())
177             return await geocoder.lookup(coord)
178
179
180 class NominatimAPI:
181     """ API loader, synchronous version.
182     """
183
184     def __init__(self, project_dir: Path,
185                  environ: Optional[Mapping[str, str]] = None) -> None:
186         self._loop = asyncio.new_event_loop()
187         self._async_api = NominatimAPIAsync(project_dir, environ)
188
189
190     def close(self) -> None:
191         """ Close all active connections to the database. The NominatimAPIAsync
192             object remains usable after closing. If a new API functions is
193             called, new connections are created.
194         """
195         self._loop.run_until_complete(self._async_api.close())
196         self._loop.close()
197
198
199     @property
200     def config(self) -> Configuration:
201         """ Return the configuration used by the API.
202         """
203         return self._async_api.config
204
205     def status(self) -> StatusResult:
206         """ Return the status of the database.
207         """
208         return self._loop.run_until_complete(self._async_api.status())
209
210
211     def details(self, place: PlaceRef,
212                 details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
213         """ Get detailed information about a place in the database.
214         """
215         return self._loop.run_until_complete(self._async_api.details(place, details))
216
217
218     def lookup(self, places: Sequence[PlaceRef],
219                details: Optional[LookupDetails] = None) -> SearchResults:
220         """ Get simple information about a list of places.
221
222             Returns a list of place information for all IDs that were found.
223         """
224         return self._loop.run_until_complete(self._async_api.lookup(places, details))
225
226
227     def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
228                 layer: Optional[DataLayer] = None,
229                 details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
230         """ Find a place by its coordinates. Also known as reverse geocoding.
231
232             Returns the closest result that can be found or None if
233             no place matches the given criteria.
234         """
235         return self._loop.run_until_complete(
236                    self._async_api.reverse(coord, max_rank, layer, details))