"""
Implementation of classes for API access via libraries.
"""
-from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List,\
+from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, \
Union, Tuple, cast
import asyncio
import sys
from .sql.sqlalchemy_schema import SearchTables
from .sql.async_core_library import PGCORE_LIB, PGCORE_ERROR
from .config import Configuration
-from .sql import sqlite_functions, sqlalchemy_functions #pylint: disable=unused-import
+from .sql import sqlite_functions, sqlalchemy_functions # noqa
from .connection import SearchConnection
from .status import get_status, StatusResult
-from .lookup import get_detailed_place, get_simple_place
+from .lookup import get_places, get_detailed_place
from .reverse import ReverseGeocoder
from .search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
from . import types as ntyp
from .results import DetailedResult, ReverseResult, SearchResults
-class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes
+class NominatimAPIAsync:
""" The main frontend to the Nominatim database implements the
functions for lookup, forward and reverse geocoding using
asynchronous functions.
"""
self.config = Configuration(project_dir, environ)
self.query_timeout = self.config.get_int('QUERY_TIMEOUT') \
- if self.config.QUERY_TIMEOUT else None
+ if self.config.QUERY_TIMEOUT else None
self.reverse_restrict_to_country_area = self.config.get_bool('SEARCH_WITHIN_COUNTRIES')
self.server_version = 0
if sys.version_info >= (3, 10):
self._engine_lock = asyncio.Lock()
else:
- self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
+ self._engine_lock = asyncio.Lock(loop=loop)
self._engine: Optional[sa_asyncio.AsyncEngine] = None
self._tables: Optional[SearchTables] = None
self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
-
async def setup_database(self) -> None:
""" Set up the SQL engine and connections.
extra_args['max_overflow'] = 0
extra_args['pool_size'] = self.config.get_int('API_POOL_SIZE')
-
is_sqlite = self.config.DATABASE_DSN.startswith('sqlite:')
if is_sqlite:
async with engine.begin() as conn:
result = await conn.scalar(sa.text('SHOW server_version_num'))
server_version = int(result)
- if server_version >= 110000:
- await conn.execute(sa.text("SET jit_above_cost TO '-1'"))
- await conn.execute(sa.text(
- "SET max_parallel_workers_per_gather TO '0'"))
+ await conn.execute(sa.text("SET jit_above_cost TO '-1'"))
+ await conn.execute(sa.text(
+ "SET max_parallel_workers_per_gather TO '0'"))
except (PGCORE_ERROR, sa.exc.OperationalError):
server_version = 0
- if server_version >= 110000:
- @sa.event.listens_for(engine.sync_engine, "connect")
- def _on_connect(dbapi_con: Any, _: Any) -> None:
- cursor = dbapi_con.cursor()
- cursor.execute("SET jit_above_cost TO '-1'")
- cursor.execute("SET max_parallel_workers_per_gather TO '0'")
+ @sa.event.listens_for(engine.sync_engine, "connect")
+ def _on_connect(dbapi_con: Any, _: Any) -> None:
+ cursor = dbapi_con.cursor()
+ cursor.execute("SET jit_above_cost TO '-1'")
+ cursor.execute("SET max_parallel_workers_per_gather TO '0'")
self._property_cache['DB:server_version'] = server_version
- self._tables = SearchTables(sa.MetaData()) # pylint: disable=no-member
+ self._tables = SearchTables(sa.MetaData())
self._engine = engine
-
async def close(self) -> None:
""" Close all active connections to the database. The NominatimAPIAsync
object remains usable after closing. If a new API functions is
if self._engine is not None:
await self._engine.dispose()
-
async def __aenter__(self) -> 'NominatimAPIAsync':
return self
-
async def __aexit__(self, *_: Any) -> None:
await self.close()
-
@contextlib.asynccontextmanager
async def begin(self) -> AsyncIterator[SearchConnection]:
""" Create a new connection with automatic transaction handling.
assert self._tables is not None
async with self._engine.begin() as conn:
- yield SearchConnection(conn, self._tables, self._property_cache)
-
+ yield SearchConnection(conn, self._tables, self._property_cache, self.config)
async def status(self) -> StatusResult:
""" Return the status of the database.
return status
-
async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
""" Get detailed information about a place in the database.
await make_query_analyzer(conn)
return await get_detailed_place(conn, place, details)
-
async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
""" Get simple information about a list of places.
conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
- return SearchResults(filter(None,
- [await get_simple_place(conn, p, details) for p in places]))
-
+ return await get_places(conn, places, details)
async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
""" Find a place by its coordinates. Also known as reverse geocoding.
self.reverse_restrict_to_country_area)
return await geocoder.lookup(coord)
-
async def search(self, query: str, **params: Any) -> SearchResults:
""" Find a place by free-text search. Also known as forward geocoding.
"""
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
- self.config.get_int('REQUEST_TIMEOUT') \
- if self.config.REQUEST_TIMEOUT else None)
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
return await geocoder.lookup(phrases)
-
- # pylint: disable=too-many-arguments,too-many-branches
async def search_address(self, amenity: Optional[str] = None,
street: Optional[str] = None,
city: Optional[str] = None,
details.layers |= ntyp.DataLayer.POI
geocoder = ForwardGeocoder(conn, details,
- self.config.get_int('REQUEST_TIMEOUT') \
- if self.config.REQUEST_TIMEOUT else None)
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup(phrases)
-
async def search_category(self, categories: List[Tuple[str, str]],
near_query: Optional[str] = None,
**params: Any) -> SearchResults:
await make_query_analyzer(conn)
geocoder = ForwardGeocoder(conn, details,
- self.config.get_int('REQUEST_TIMEOUT') \
- if self.config.REQUEST_TIMEOUT else None)
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup_pois(categories, phrases)
-
class NominatimAPI:
""" This class provides a thin synchronous wrapper around the asynchronous
Nominatim functions. It creates its own event loop and runs each
self._loop = asyncio.new_event_loop()
self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
-
def close(self) -> None:
""" Close all active connections to the database.
self._loop.run_until_complete(self._async_api.close())
self._loop.close()
-
def __enter__(self) -> 'NominatimAPI':
return self
-
def __exit__(self, *_: Any) -> None:
self.close()
-
@property
def config(self) -> Configuration:
""" Provide read-only access to the [configuration](Configuration.md)
"""
return self._loop.run_until_complete(self._async_api.status())
-
def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
""" Get detailed information about a place in the database.
"""
return self._loop.run_until_complete(self._async_api.details(place, **params))
-
def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
""" Get simple information about a list of places.
"""
return self._loop.run_until_complete(self._async_api.lookup(places, **params))
-
def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
""" Find a place by its coordinates. Also known as reverse geocoding.
"""
return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
-
def search(self, query: str, **params: Any) -> SearchResults:
""" Find a place by free-text search. Also known as forward geocoding.
return self._loop.run_until_complete(
self._async_api.search(query, **params))
-
- # pylint: disable=too-many-arguments
def search_address(self, amenity: Optional[str] = None,
street: Optional[str] = None,
city: Optional[str] = None,
self._async_api.search_address(amenity, street, city, county,
state, country, postalcode, **params))
-
def search_category(self, categories: List[Tuple[str, str]],
near_query: Optional[str] = None,
**params: Any) -> SearchResults: