"""
from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
import asyncio
+import sys
import contextlib
from pathlib import Path
""" API loader asynchornous version.
"""
def __init__(self, project_dir: Path,
- environ: Optional[Mapping[str, str]] = None) -> None:
+ environ: Optional[Mapping[str, str]] = None,
+ loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
self.config = Configuration(project_dir, environ)
+ self.query_timeout = self.config.get_int('QUERY_TIMEOUT') \
+ if self.config.QUERY_TIMEOUT else None
self.server_version = 0
- self._engine_lock = asyncio.Lock()
+ if sys.version_info >= (3, 10):
+ self._engine_lock = asyncio.Lock()
+ else:
+ self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
self._engine: Optional[sa_asyncio.AsyncEngine] = None
self._tables: Optional[SearchTables] = None
self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
return
dsn = self.config.get_database_params()
+ pool_size = self.config.get_int('API_POOL_SIZE')
query = {k: v for k, v in dsn.items()
if k not in ('user', 'password', 'dbname', 'host', 'port')}
- if PGCORE_LIB == 'asyncpg':
- query['prepared_statement_cache_size'] = '0'
dburl = sa.engine.URL.create(
f'postgresql+{PGCORE_LIB}',
username=dsn.get('user'), password=dsn.get('password'),
host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
query=query)
- engine = sa_asyncio.create_async_engine(dburl, future=True)
+ engine = sa_asyncio.create_async_engine(dburl, future=True,
+ max_overflow=0, pool_size=pool_size,
+ echo=self.config.get_bool('DEBUG_SQL'))
try:
async with engine.begin() as conn:
"""
try:
async with self.begin() as conn:
+ conn.set_query_timeout(self.query_timeout)
status = await get_status(conn)
except (PGCORE_ERROR, sa.exc.OperationalError):
return StatusResult(700, 'Database connection failed')
"""
details = ntyp.LookupDetails.from_kwargs(params)
async with self.begin() as conn:
+ conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
return await get_detailed_place(conn, place, details)
"""
details = ntyp.LookupDetails.from_kwargs(params)
async with self.begin() as conn:
+ conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
return SearchResults(filter(None,
details = ntyp.ReverseDetails.from_kwargs(params)
async with self.begin() as conn:
+ conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
geocoder = ReverseGeocoder(conn, details)
raise UsageError('Nothing to search for.')
async with self.begin() as conn:
- geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
+ conn.set_query_timeout(self.query_timeout)
+ geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
+ self.config.get_int('REQUEST_TIMEOUT') \
+ if self.config.REQUEST_TIMEOUT else None)
phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
return await geocoder.lookup(phrases)
""" Find an address using structured search.
"""
async with self.begin() as conn:
+ conn.set_query_timeout(self.query_timeout)
details = ntyp.SearchDetails.from_kwargs(params)
phrases: List[Phrase] = []
if amenity:
details.layers |= ntyp.DataLayer.POI
- geocoder = ForwardGeocoder(conn, details)
+ geocoder = ForwardGeocoder(conn, details,
+ self.config.get_int('REQUEST_TIMEOUT') \
+ if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup(phrases)
details = ntyp.SearchDetails.from_kwargs(params)
async with self.begin() as conn:
+ conn.set_query_timeout(self.query_timeout)
if near_query:
phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
else:
if details.keywords:
await make_query_analyzer(conn)
- geocoder = ForwardGeocoder(conn, details)
+ geocoder = ForwardGeocoder(conn, details,
+ self.config.get_int('REQUEST_TIMEOUT') \
+ if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup_pois(categories, phrases)
def __init__(self, project_dir: Path,
environ: Optional[Mapping[str, str]] = None) -> None:
self._loop = asyncio.new_event_loop()
- self._async_api = NominatimAPIAsync(project_dir, environ)
+ self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
def close(self) -> None: