]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/core.py
Merge pull request #3064 from lonvia/clicmd-debug-output
[nominatim.git] / nominatim / api / core.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of classes for API access via libraries.
9 """
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence
11 import asyncio
12 import contextlib
13 from pathlib import Path
14
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
17
18
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
21 from nominatim.config import Configuration
22 from nominatim.api.connection import SearchConnection
23 from nominatim.api.status import get_status, StatusResult
24 from nominatim.api.lookup import get_detailed_place, get_simple_place
25 from nominatim.api.reverse import ReverseGeocoder
26 import nominatim.api.types as ntyp
27 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
28
29
30 class NominatimAPIAsync:
31     """ API loader asynchornous version.
32     """
33     def __init__(self, project_dir: Path,
34                  environ: Optional[Mapping[str, str]] = None) -> None:
35         self.config = Configuration(project_dir, environ)
36         self.server_version = 0
37
38         self._engine_lock = asyncio.Lock()
39         self._engine: Optional[sa_asyncio.AsyncEngine] = None
40         self._tables: Optional[SearchTables] = None
41         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
42
43
44     async def setup_database(self) -> None:
45         """ Set up the engine and connection parameters.
46
47             This function will be implicitly called when the database is
48             accessed for the first time. You may also call it explicitly to
49             avoid that the first call is delayed by the setup.
50         """
51         async with self._engine_lock:
52             if self._engine:
53                 return
54
55             dsn = self.config.get_database_params()
56
57             query = {k: v for k, v in dsn.items()
58                       if k not in ('user', 'password', 'dbname', 'host', 'port')}
59             if PGCORE_LIB == 'asyncpg':
60                 query['prepared_statement_cache_size'] = '0'
61
62             dburl = sa.engine.URL.create(
63                        f'postgresql+{PGCORE_LIB}',
64                        database=dsn.get('dbname'),
65                        username=dsn.get('user'), password=dsn.get('password'),
66                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
67                        query=query)
68             engine = sa_asyncio.create_async_engine(dburl, future=True)
69
70             try:
71                 async with engine.begin() as conn:
72                     result = await conn.scalar(sa.text('SHOW server_version_num'))
73                     server_version = int(result)
74             except (PGCORE_ERROR, sa.exc.OperationalError):
75                 server_version = 0
76
77             if server_version >= 110000:
78                 @sa.event.listens_for(engine.sync_engine, "connect")
79                 def _on_connect(dbapi_con: Any, _: Any) -> None:
80                     cursor = dbapi_con.cursor()
81                     cursor.execute("SET jit_above_cost TO '-1'")
82                     cursor.execute("SET max_parallel_workers_per_gather TO '0'")
83                 # Make sure that all connections get the new settings
84                 await self.close()
85
86             self._property_cache['DB:server_version'] = server_version
87
88             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
89             self._engine = engine
90
91
92     async def close(self) -> None:
93         """ Close all active connections to the database. The NominatimAPIAsync
94             object remains usable after closing. If a new API functions is
95             called, new connections are created.
96         """
97         if self._engine is not None:
98             await self._engine.dispose()
99
100
101     @contextlib.asynccontextmanager
102     async def begin(self) -> AsyncIterator[SearchConnection]:
103         """ Create a new connection with automatic transaction handling.
104
105             This function may be used to get low-level access to the database.
106             Refer to the documentation of SQLAlchemy for details how to use
107             the connection object.
108         """
109         if self._engine is None:
110             await self.setup_database()
111
112         assert self._engine is not None
113         assert self._tables is not None
114
115         async with self._engine.begin() as conn:
116             yield SearchConnection(conn, self._tables, self._property_cache)
117
118
119     async def status(self) -> StatusResult:
120         """ Return the status of the database.
121         """
122         try:
123             async with self.begin() as conn:
124                 status = await get_status(conn)
125         except (PGCORE_ERROR, sa.exc.OperationalError):
126             return StatusResult(700, 'Database connection failed')
127
128         return status
129
130
131     async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
132         """ Get detailed information about a place in the database.
133
134             Returns None if there is no entry under the given ID.
135         """
136         async with self.begin() as conn:
137             return await get_detailed_place(conn, place,
138                                             ntyp.LookupDetails.from_kwargs(params))
139
140
141     async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
142         """ Get simple information about a list of places.
143
144             Returns a list of place information for all IDs that were found.
145         """
146         details = ntyp.LookupDetails.from_kwargs(params)
147         async with self.begin() as conn:
148             return SearchResults(filter(None,
149                                         [await get_simple_place(conn, p, details) for p in places]))
150
151
152     async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
153         """ Find a place by its coordinates. Also known as reverse geocoding.
154
155             Returns the closest result that can be found or None if
156             no place matches the given criteria.
157         """
158         # The following negation handles NaN correctly. Don't change.
159         if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
160             # There are no results to be expected outside valid coordinates.
161             return None
162
163         async with self.begin() as conn:
164             geocoder = ReverseGeocoder(conn, ntyp.ReverseDetails.from_kwargs(params))
165             return await geocoder.lookup(coord)
166
167
168 class NominatimAPI:
169     """ API loader, synchronous version.
170     """
171
172     def __init__(self, project_dir: Path,
173                  environ: Optional[Mapping[str, str]] = None) -> None:
174         self._loop = asyncio.new_event_loop()
175         self._async_api = NominatimAPIAsync(project_dir, environ)
176
177
178     def close(self) -> None:
179         """ Close all active connections to the database. The NominatimAPIAsync
180             object remains usable after closing. If a new API functions is
181             called, new connections are created.
182         """
183         self._loop.run_until_complete(self._async_api.close())
184         self._loop.close()
185
186
187     @property
188     def config(self) -> Configuration:
189         """ Return the configuration used by the API.
190         """
191         return self._async_api.config
192
193     def status(self) -> StatusResult:
194         """ Return the status of the database.
195         """
196         return self._loop.run_until_complete(self._async_api.status())
197
198
199     def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
200         """ Get detailed information about a place in the database.
201         """
202         return self._loop.run_until_complete(self._async_api.details(place, **params))
203
204
205     def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
206         """ Get simple information about a list of places.
207
208             Returns a list of place information for all IDs that were found.
209         """
210         return self._loop.run_until_complete(self._async_api.lookup(places, **params))
211
212
213     def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
214         """ Find a place by its coordinates. Also known as reverse geocoding.
215
216             Returns the closest result that can be found or None if
217             no place matches the given criteria.
218         """
219         return self._loop.run_until_complete(self._async_api.reverse(coord, **params))