]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/api/core.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / nominatim / api / core.py
index 8d503fa5e836cbb9b82ab992530de5cead00bf4b..0f1dd7153af8f582410f398ab984d57ff8a18fb2 100644 (file)
@@ -9,6 +9,7 @@ Implementation of classes for API access via libraries.
 """
 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
 import asyncio
 """
 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
 import asyncio
+import sys
 import contextlib
 from pathlib import Path
 
 import contextlib
 from pathlib import Path
 
@@ -32,11 +33,17 @@ class NominatimAPIAsync:
     """ API loader asynchornous version.
     """
     def __init__(self, project_dir: Path,
     """ API loader asynchornous version.
     """
     def __init__(self, project_dir: Path,
-                 environ: Optional[Mapping[str, str]] = None) -> None:
+                 environ: Optional[Mapping[str, str]] = None,
+                 loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
         self.config = Configuration(project_dir, environ)
         self.config = Configuration(project_dir, environ)
+        self.query_timeout = self.config.get_int('QUERY_TIMEOUT') \
+                             if self.config.QUERY_TIMEOUT else None
         self.server_version = 0
 
         self.server_version = 0
 
-        self._engine_lock = asyncio.Lock()
+        if sys.version_info >= (3, 10):
+            self._engine_lock = asyncio.Lock()
+        else:
+            self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
         self._engine: Optional[sa_asyncio.AsyncEngine] = None
         self._tables: Optional[SearchTables] = None
         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
         self._engine: Optional[sa_asyncio.AsyncEngine] = None
         self._tables: Optional[SearchTables] = None
         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
@@ -54,11 +61,10 @@ class NominatimAPIAsync:
                 return
 
             dsn = self.config.get_database_params()
                 return
 
             dsn = self.config.get_database_params()
+            pool_size = self.config.get_int('API_POOL_SIZE')
 
             query = {k: v for k, v in dsn.items()
                       if k not in ('user', 'password', 'dbname', 'host', 'port')}
 
             query = {k: v for k, v in dsn.items()
                       if k not in ('user', 'password', 'dbname', 'host', 'port')}
-            if PGCORE_LIB == 'asyncpg':
-                query['prepared_statement_cache_size'] = '0'
 
             dburl = sa.engine.URL.create(
                        f'postgresql+{PGCORE_LIB}',
 
             dburl = sa.engine.URL.create(
                        f'postgresql+{PGCORE_LIB}',
@@ -67,6 +73,7 @@ class NominatimAPIAsync:
                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
                        query=query)
             engine = sa_asyncio.create_async_engine(dburl, future=True,
                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
                        query=query)
             engine = sa_asyncio.create_async_engine(dburl, future=True,
+                                                    max_overflow=0, pool_size=pool_size,
                                                     echo=self.config.get_bool('DEBUG_SQL'))
 
             try:
                                                     echo=self.config.get_bool('DEBUG_SQL'))
 
             try:
@@ -123,6 +130,7 @@ class NominatimAPIAsync:
         """
         try:
             async with self.begin() as conn:
         """
         try:
             async with self.begin() as conn:
+                conn.set_query_timeout(self.query_timeout)
                 status = await get_status(conn)
         except (PGCORE_ERROR, sa.exc.OperationalError):
             return StatusResult(700, 'Database connection failed')
                 status = await get_status(conn)
         except (PGCORE_ERROR, sa.exc.OperationalError):
             return StatusResult(700, 'Database connection failed')
@@ -137,6 +145,7 @@ class NominatimAPIAsync:
         """
         details = ntyp.LookupDetails.from_kwargs(params)
         async with self.begin() as conn:
         """
         details = ntyp.LookupDetails.from_kwargs(params)
         async with self.begin() as conn:
+            conn.set_query_timeout(self.query_timeout)
             if details.keywords:
                 await make_query_analyzer(conn)
             return await get_detailed_place(conn, place, details)
             if details.keywords:
                 await make_query_analyzer(conn)
             return await get_detailed_place(conn, place, details)
@@ -149,6 +158,7 @@ class NominatimAPIAsync:
         """
         details = ntyp.LookupDetails.from_kwargs(params)
         async with self.begin() as conn:
         """
         details = ntyp.LookupDetails.from_kwargs(params)
         async with self.begin() as conn:
+            conn.set_query_timeout(self.query_timeout)
             if details.keywords:
                 await make_query_analyzer(conn)
             return SearchResults(filter(None,
             if details.keywords:
                 await make_query_analyzer(conn)
             return SearchResults(filter(None,
@@ -168,6 +178,7 @@ class NominatimAPIAsync:
 
         details = ntyp.ReverseDetails.from_kwargs(params)
         async with self.begin() as conn:
 
         details = ntyp.ReverseDetails.from_kwargs(params)
         async with self.begin() as conn:
+            conn.set_query_timeout(self.query_timeout)
             if details.keywords:
                 await make_query_analyzer(conn)
             geocoder = ReverseGeocoder(conn, details)
             if details.keywords:
                 await make_query_analyzer(conn)
             geocoder = ReverseGeocoder(conn, details)
@@ -182,7 +193,10 @@ class NominatimAPIAsync:
             raise UsageError('Nothing to search for.')
 
         async with self.begin() as conn:
             raise UsageError('Nothing to search for.')
 
         async with self.begin() as conn:
-            geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
+            conn.set_query_timeout(self.query_timeout)
+            geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
+                                       self.config.get_int('REQUEST_TIMEOUT') \
+                                         if self.config.REQUEST_TIMEOUT else None)
             phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
             return await geocoder.lookup(phrases)
 
             phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
             return await geocoder.lookup(phrases)
 
@@ -199,6 +213,7 @@ class NominatimAPIAsync:
         """ Find an address using structured search.
         """
         async with self.begin() as conn:
         """ Find an address using structured search.
         """
         async with self.begin() as conn:
+            conn.set_query_timeout(self.query_timeout)
             details = ntyp.SearchDetails.from_kwargs(params)
 
             phrases: List[Phrase] = []
             details = ntyp.SearchDetails.from_kwargs(params)
 
             phrases: List[Phrase] = []
@@ -239,7 +254,9 @@ class NominatimAPIAsync:
                 if amenity:
                     details.layers |= ntyp.DataLayer.POI
 
                 if amenity:
                     details.layers |= ntyp.DataLayer.POI
 
-            geocoder = ForwardGeocoder(conn, details)
+            geocoder = ForwardGeocoder(conn, details,
+                                       self.config.get_int('REQUEST_TIMEOUT') \
+                                         if self.config.REQUEST_TIMEOUT else None)
             return await geocoder.lookup(phrases)
 
 
             return await geocoder.lookup(phrases)
 
 
@@ -255,6 +272,7 @@ class NominatimAPIAsync:
 
         details = ntyp.SearchDetails.from_kwargs(params)
         async with self.begin() as conn:
 
         details = ntyp.SearchDetails.from_kwargs(params)
         async with self.begin() as conn:
+            conn.set_query_timeout(self.query_timeout)
             if near_query:
                 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
             else:
             if near_query:
                 phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
             else:
@@ -262,7 +280,9 @@ class NominatimAPIAsync:
                 if details.keywords:
                     await make_query_analyzer(conn)
 
                 if details.keywords:
                     await make_query_analyzer(conn)
 
-            geocoder = ForwardGeocoder(conn, details)
+            geocoder = ForwardGeocoder(conn, details,
+                                       self.config.get_int('REQUEST_TIMEOUT') \
+                                         if self.config.REQUEST_TIMEOUT else None)
             return await geocoder.lookup_pois(categories, phrases)
 
 
             return await geocoder.lookup_pois(categories, phrases)
 
 
@@ -274,7 +294,7 @@ class NominatimAPI:
     def __init__(self, project_dir: Path,
                  environ: Optional[Mapping[str, str]] = None) -> None:
         self._loop = asyncio.new_event_loop()
     def __init__(self, project_dir: Path,
                  environ: Optional[Mapping[str, str]] = None) -> None:
         self._loop = asyncio.new_event_loop()
-        self._async_api = NominatimAPIAsync(project_dir, environ)
+        self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
 
 
     def close(self) -> None:
 
 
     def close(self) -> None: