"""
Implementation of classes for API access via libraries.
"""
-from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
+from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, \
+ Union, Tuple, cast
import asyncio
import sys
import contextlib
import sqlalchemy as sa
import sqlalchemy.ext.asyncio as sa_asyncio
-from nominatim_core.errors import UsageError
-from nominatim_core.db.sqlalchemy_schema import SearchTables
-from nominatim_core.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
-from nominatim_core.config import Configuration
-from .sql import sqlite_functions, sqlalchemy_functions #pylint: disable=unused-import
+from .errors import UsageError
+from .sql.sqlalchemy_schema import SearchTables
+from .sql.async_core_library import PGCORE_LIB, PGCORE_ERROR
+from .config import Configuration
+from .sql import sqlite_functions, sqlalchemy_functions # noqa
from .connection import SearchConnection
from .status import get_status, StatusResult
-from .lookup import get_detailed_place, get_simple_place
+from .lookup import get_places, get_detailed_place
from .reverse import ReverseGeocoder
from .search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
from . import types as ntyp
from .results import DetailedResult, ReverseResult, SearchResults
-class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes
+class NominatimAPIAsync:
""" The main frontend to the Nominatim database implements the
functions for lookup, forward and reverse geocoding using
asynchronous functions.
This class shares most of the functions with its synchronous
version. There are some additional functions or parameters,
which are documented below.
+
+ This class should usually be used as a context manager in 'with' context.
"""
- def __init__(self, project_dir: Path,
+ def __init__(self, project_dir: Optional[Union[str, Path]] = None,
environ: Optional[Mapping[str, str]] = None,
loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
""" Initiate a new frontend object with synchronous API functions.
"""
self.config = Configuration(project_dir, environ)
self.query_timeout = self.config.get_int('QUERY_TIMEOUT') \
- if self.config.QUERY_TIMEOUT else None
+ if self.config.QUERY_TIMEOUT else None
self.reverse_restrict_to_country_area = self.config.get_bool('SEARCH_WITHIN_COUNTRIES')
self.server_version = 0
if sys.version_info >= (3, 10):
self._engine_lock = asyncio.Lock()
else:
- self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
+ self._engine_lock = asyncio.Lock(loop=loop)
self._engine: Optional[sa_asyncio.AsyncEngine] = None
self._tables: Optional[SearchTables] = None
self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
-
async def setup_database(self) -> None:
""" Set up the SQL engine and connections.
extra_args['max_overflow'] = 0
extra_args['pool_size'] = self.config.get_int('API_POOL_SIZE')
-
is_sqlite = self.config.DATABASE_DSN.startswith('sqlite:')
if is_sqlite:
raise UsageError(f"SQlite database '{params.get('dbname')}' does not exist.")
else:
dsn = self.config.get_database_params()
- query = {k: v for k, v in dsn.items()
+ query = {k: str(v) for k, v in dsn.items()
if k not in ('user', 'password', 'dbname', 'host', 'port')}
dburl = sa.engine.URL.create(
f'postgresql+{PGCORE_LIB}',
- database=dsn.get('dbname'),
- username=dsn.get('user'),
- password=dsn.get('password'),
- host=dsn.get('host'),
- port=int(dsn['port']) if 'port' in dsn else None,
+ database=cast(str, dsn.get('dbname')),
+ username=cast(str, dsn.get('user')),
+ password=cast(str, dsn.get('password')),
+ host=cast(str, dsn.get('host')),
+ port=int(cast(str, dsn['port'])) if 'port' in dsn else None,
query=query)
engine = sa_asyncio.create_async_engine(dburl, **extra_args)
async with engine.begin() as conn:
result = await conn.scalar(sa.text('SHOW server_version_num'))
server_version = int(result)
- if server_version >= 110000:
- await conn.execute(sa.text("SET jit_above_cost TO '-1'"))
- await conn.execute(sa.text(
- "SET max_parallel_workers_per_gather TO '0'"))
+ await conn.execute(sa.text("SET jit_above_cost TO '-1'"))
+ await conn.execute(sa.text(
+ "SET max_parallel_workers_per_gather TO '0'"))
except (PGCORE_ERROR, sa.exc.OperationalError):
server_version = 0
- if server_version >= 110000:
- @sa.event.listens_for(engine.sync_engine, "connect")
- def _on_connect(dbapi_con: Any, _: Any) -> None:
- cursor = dbapi_con.cursor()
- cursor.execute("SET jit_above_cost TO '-1'")
- cursor.execute("SET max_parallel_workers_per_gather TO '0'")
+ @sa.event.listens_for(engine.sync_engine, "connect")
+ def _on_connect(dbapi_con: Any, _: Any) -> None:
+ cursor = dbapi_con.cursor()
+ cursor.execute("SET jit_above_cost TO '-1'")
+ cursor.execute("SET max_parallel_workers_per_gather TO '0'")
self._property_cache['DB:server_version'] = server_version
- self._tables = SearchTables(sa.MetaData()) # pylint: disable=no-member
+ self._tables = SearchTables(sa.MetaData())
self._engine = engine
-
async def close(self) -> None:
""" Close all active connections to the database. The NominatimAPIAsync
object remains usable after closing. If a new API functions is
if self._engine is not None:
await self._engine.dispose()
+ async def __aenter__(self) -> 'NominatimAPIAsync':
+ return self
+
+ async def __aexit__(self, *_: Any) -> None:
+ await self.close()
@contextlib.asynccontextmanager
async def begin(self) -> AsyncIterator[SearchConnection]:
async with self._engine.begin() as conn:
yield SearchConnection(conn, self._tables, self._property_cache)
-
async def status(self) -> StatusResult:
""" Return the status of the database.
"""
return status
-
async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
""" Get detailed information about a place in the database.
await make_query_analyzer(conn)
return await get_detailed_place(conn, place, details)
-
async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
""" Get simple information about a list of places.
conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
- return SearchResults(filter(None,
- [await get_simple_place(conn, p, details) for p in places]))
-
+ return await get_places(conn, places, details)
async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
""" Find a place by its coordinates. Also known as reverse geocoding.
self.reverse_restrict_to_country_area)
return await geocoder.lookup(coord)
-
async def search(self, query: str, **params: Any) -> SearchResults:
""" Find a place by free-text search. Also known as forward geocoding.
"""
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
- self.config.get_int('REQUEST_TIMEOUT') \
- if self.config.REQUEST_TIMEOUT else None)
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
return await geocoder.lookup(phrases)
-
- # pylint: disable=too-many-arguments,too-many-branches
async def search_address(self, amenity: Optional[str] = None,
street: Optional[str] = None,
city: Optional[str] = None,
details.layers |= ntyp.DataLayer.POI
geocoder = ForwardGeocoder(conn, details,
- self.config.get_int('REQUEST_TIMEOUT') \
- if self.config.REQUEST_TIMEOUT else None)
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup(phrases)
-
async def search_category(self, categories: List[Tuple[str, str]],
near_query: Optional[str] = None,
**params: Any) -> SearchResults:
await make_query_analyzer(conn)
geocoder = ForwardGeocoder(conn, details,
- self.config.get_int('REQUEST_TIMEOUT') \
- if self.config.REQUEST_TIMEOUT else None)
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup_pois(categories, phrases)
-
class NominatimAPI:
""" This class provides a thin synchronous wrapper around the asynchronous
Nominatim functions. It creates its own event loop and runs each
synchronous function call to completion using that loop.
+
+ This class should usually be used as a context manager in 'with' context.
"""
- def __init__(self, project_dir: Path,
+ def __init__(self, project_dir: Optional[Union[str, Path]] = None,
environ: Optional[Mapping[str, str]] = None) -> None:
""" Initiate a new frontend object with synchronous API functions.
self._loop = asyncio.new_event_loop()
self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
-
def close(self) -> None:
""" Close all active connections to the database.
This function also closes the asynchronous worker loop making
the NominatimAPI object unusable.
"""
- self._loop.run_until_complete(self._async_api.close())
- self._loop.close()
+ if not self._loop.is_closed():
+ self._loop.run_until_complete(self._async_api.close())
+ self._loop.close()
+
+ def __enter__(self) -> 'NominatimAPI':
+ return self
+ def __exit__(self, *_: Any) -> None:
+ self.close()
@property
def config(self) -> Configuration:
- """ Provide read-only access to the [configuration](#Configuration)
+ """ Provide read-only access to the [configuration](Configuration.md)
used by the API.
"""
return self._async_api.config
"""
return self._loop.run_until_complete(self._async_api.status())
-
def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
""" Get detailed information about a place in the database.
"""
return self._loop.run_until_complete(self._async_api.details(place, **params))
-
def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
""" Get simple information about a list of places.
"""
return self._loop.run_until_complete(self._async_api.lookup(places, **params))
-
def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
""" Find a place by its coordinates. Also known as reverse geocoding.
"""
return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
-
def search(self, query: str, **params: Any) -> SearchResults:
""" Find a place by free-text search. Also known as forward geocoding.
return self._loop.run_until_complete(
self._async_api.search(query, **params))
-
- # pylint: disable=too-many-arguments
def search_address(self, amenity: Optional[str] = None,
street: Optional[str] = None,
city: Optional[str] = None,
self._async_api.search_address(amenity, street, city, county,
state, country, postalcode, **params))
-
def search_category(self, categories: List[Tuple[str, str]],
near_query: Optional[str] = None,
**params: Any) -> SearchResults: