X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/32c1e59622a8e9f76c351160e7150e625e3238a3..3bf489cd7c5eec14e56ea6e95156f2209762828a:/nominatim/api/core.py diff --git a/nominatim/api/core.py b/nominatim/api/core.py index 159229dd..f1a656da 100644 --- a/nominatim/api/core.py +++ b/nominatim/api/core.py @@ -7,17 +7,25 @@ """ Implementation of classes for API access via libraries. """ -from typing import Mapping, Optional, Any, AsyncIterator +from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence import asyncio import contextlib from pathlib import Path import sqlalchemy as sa import sqlalchemy.ext.asyncio as sa_asyncio -import asyncpg + +from nominatim.db.sqlalchemy_schema import SearchTables +from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR from nominatim.config import Configuration +from nominatim.api.connection import SearchConnection from nominatim.api.status import get_status, StatusResult +from nominatim.api.lookup import get_detailed_place, get_simple_place +from nominatim.api.reverse import ReverseGeocoder +import nominatim.api.types as ntyp +from nominatim.api.results import DetailedResult, ReverseResult, SearchResults + class NominatimAPIAsync: """ API loader asynchornous version. @@ -29,6 +37,8 @@ class NominatimAPIAsync: self._engine_lock = asyncio.Lock() self._engine: Optional[sa_asyncio.AsyncEngine] = None + self._tables: Optional[SearchTables] = None + self._property_cache: Dict[str, Any] = {'DB:server_version': 0} async def setup_database(self) -> None: @@ -44,35 +54,38 @@ class NominatimAPIAsync: dsn = self.config.get_database_params() + query = {k: v for k, v in dsn.items() + if k not in ('user', 'password', 'dbname', 'host', 'port')} + if PGCORE_LIB == 'asyncpg': + query['prepared_statement_cache_size'] = '0' + dburl = sa.engine.URL.create( - 'postgresql+asyncpg', + f'postgresql+{PGCORE_LIB}', database=dsn.get('dbname'), username=dsn.get('user'), password=dsn.get('password'), host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None, - query={k: v for k, v in dsn.items() - if k not in ('user', 'password', 'dbname', 'host', 'port')}) - engine = sa_asyncio.create_async_engine( - dburl, future=True, - connect_args={'server_settings': { - 'DateStyle': 'sql,european', - 'max_parallel_workers_per_gather': '0' - }}) + query=query) + engine = sa_asyncio.create_async_engine(dburl, future=True) try: async with engine.begin() as conn: result = await conn.scalar(sa.text('SHOW server_version_num')) - self.server_version = int(result) - except asyncpg.PostgresError: - self.server_version = 0 + server_version = int(result) + except (PGCORE_ERROR, sa.exc.OperationalError): + server_version = 0 - if self.server_version >= 110000: - @sa.event.listens_for(engine.sync_engine, "connect") # type: ignore[misc] + if server_version >= 110000: + @sa.event.listens_for(engine.sync_engine, "connect") def _on_connect(dbapi_con: Any, _: Any) -> None: cursor = dbapi_con.cursor() cursor.execute("SET jit_above_cost TO '-1'") + cursor.execute("SET max_parallel_workers_per_gather TO '0'") # Make sure that all connections get the new settings await self.close() + self._property_cache['DB:server_version'] = server_version + + self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member self._engine = engine @@ -86,7 +99,7 @@ class NominatimAPIAsync: @contextlib.asynccontextmanager - async def begin(self) -> AsyncIterator[sa_asyncio.AsyncConnection]: + async def begin(self) -> AsyncIterator[SearchConnection]: """ Create a new connection with automatic transaction handling. This function may be used to get low-level access to the database. @@ -97,9 +110,10 @@ class NominatimAPIAsync: await self.setup_database() assert self._engine is not None + assert self._tables is not None async with self._engine.begin() as conn: - yield conn + yield SearchConnection(conn, self._tables, self._property_cache) async def status(self) -> StatusResult: @@ -108,12 +122,49 @@ class NominatimAPIAsync: try: async with self.begin() as conn: status = await get_status(conn) - except asyncpg.PostgresError: + except (PGCORE_ERROR, sa.exc.OperationalError): return StatusResult(700, 'Database connection failed') return status + async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]: + """ Get detailed information about a place in the database. + + Returns None if there is no entry under the given ID. + """ + async with self.begin() as conn: + return await get_detailed_place(conn, place, + ntyp.LookupDetails.from_kwargs(params)) + + + async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults: + """ Get simple information about a list of places. + + Returns a list of place information for all IDs that were found. + """ + details = ntyp.LookupDetails.from_kwargs(params) + async with self.begin() as conn: + return SearchResults(filter(None, + [await get_simple_place(conn, p, details) for p in places])) + + + async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]: + """ Find a place by its coordinates. Also known as reverse geocoding. + + Returns the closest result that can be found or None if + no place matches the given criteria. + """ + # The following negation handles NaN correctly. Don't change. + if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90: + # There are no results to be expected outside valid coordinates. + return None + + async with self.begin() as conn: + geocoder = ReverseGeocoder(conn, ntyp.ReverseDetails.from_kwargs(params)) + return await geocoder.lookup(coord) + + class NominatimAPI: """ API loader, synchronous version. """ @@ -133,7 +184,36 @@ class NominatimAPI: self._loop.close() + @property + def config(self) -> Configuration: + """ Return the configuration used by the API. + """ + return self._async_api.config + def status(self) -> StatusResult: """ Return the status of the database. """ return self._loop.run_until_complete(self._async_api.status()) + + + def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]: + """ Get detailed information about a place in the database. + """ + return self._loop.run_until_complete(self._async_api.details(place, **params)) + + + def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults: + """ Get simple information about a list of places. + + Returns a list of place information for all IDs that were found. + """ + return self._loop.run_until_complete(self._async_api.lookup(places, **params)) + + + def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]: + """ Find a place by its coordinates. Also known as reverse geocoding. + + Returns the closest result that can be found or None if + no place matches the given criteria. + """ + return self._loop.run_until_complete(self._async_api.reverse(coord, **params))