1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict
13 from pathlib import Path
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.config import Configuration
21 from nominatim.api.connection import SearchConnection
22 from nominatim.api.status import get_status, StatusResult
23 from nominatim.api.lookup import get_place_by_id
24 from nominatim.api.types import PlaceRef, LookupDetails
25 from nominatim.api.results import SearchResult
28 class NominatimAPIAsync:
29 """ API loader asynchornous version.
31 def __init__(self, project_dir: Path,
32 environ: Optional[Mapping[str, str]] = None) -> None:
33 self.config = Configuration(project_dir, environ)
34 self.server_version = 0
36 self._engine_lock = asyncio.Lock()
37 self._engine: Optional[sa_asyncio.AsyncEngine] = None
38 self._tables: Optional[SearchTables] = None
39 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
42 async def setup_database(self) -> None:
43 """ Set up the engine and connection parameters.
45 This function will be implicitly called when the database is
46 accessed for the first time. You may also call it explicitly to
47 avoid that the first call is delayed by the setup.
49 async with self._engine_lock:
53 dsn = self.config.get_database_params()
55 dburl = sa.engine.URL.create(
57 database=dsn.get('dbname'),
58 username=dsn.get('user'), password=dsn.get('password'),
59 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
60 query={k: v for k, v in dsn.items()
61 if k not in ('user', 'password', 'dbname', 'host', 'port')})
62 engine = sa_asyncio.create_async_engine(
64 connect_args={'server_settings': {
65 'DateStyle': 'sql,european',
66 'max_parallel_workers_per_gather': '0'
70 async with engine.begin() as conn:
71 result = await conn.scalar(sa.text('SHOW server_version_num'))
72 server_version = int(result)
73 except asyncpg.PostgresError:
76 if server_version >= 110000:
77 @sa.event.listens_for(engine.sync_engine, "connect")
78 def _on_connect(dbapi_con: Any, _: Any) -> None:
79 cursor = dbapi_con.cursor()
80 cursor.execute("SET jit_above_cost TO '-1'")
81 # Make sure that all connections get the new settings
84 self._property_cache['DB:server_version'] = server_version
86 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
90 async def close(self) -> None:
91 """ Close all active connections to the database. The NominatimAPIAsync
92 object remains usable after closing. If a new API functions is
93 called, new connections are created.
95 if self._engine is not None:
96 await self._engine.dispose()
99 @contextlib.asynccontextmanager
100 async def begin(self) -> AsyncIterator[SearchConnection]:
101 """ Create a new connection with automatic transaction handling.
103 This function may be used to get low-level access to the database.
104 Refer to the documentation of SQLAlchemy for details how to use
105 the connection object.
107 if self._engine is None:
108 await self.setup_database()
110 assert self._engine is not None
111 assert self._tables is not None
113 async with self._engine.begin() as conn:
114 yield SearchConnection(conn, self._tables, self._property_cache)
117 async def status(self) -> StatusResult:
118 """ Return the status of the database.
121 async with self.begin() as conn:
122 status = await get_status(conn)
123 except asyncpg.PostgresError:
124 return StatusResult(700, 'Database connection failed')
129 async def lookup(self, place: PlaceRef,
130 details: LookupDetails) -> Optional[SearchResult]:
131 """ Get detailed information about a place in the database.
133 Returns None if there is no entry under the given ID.
135 async with self.begin() as db:
136 return await get_place_by_id(db, place, details)
140 """ API loader, synchronous version.
143 def __init__(self, project_dir: Path,
144 environ: Optional[Mapping[str, str]] = None) -> None:
145 self._loop = asyncio.new_event_loop()
146 self._async_api = NominatimAPIAsync(project_dir, environ)
149 def close(self) -> None:
150 """ Close all active connections to the database. The NominatimAPIAsync
151 object remains usable after closing. If a new API functions is
152 called, new connections are created.
154 self._loop.run_until_complete(self._async_api.close())
159 def config(self) -> Configuration:
160 """ Return the configuration used by the API.
162 return self._async_api.config
164 def status(self) -> StatusResult:
165 """ Return the status of the database.
167 return self._loop.run_until_complete(self._async_api.status())
170 def lookup(self, place: PlaceRef,
171 details: LookupDetails) -> Optional[SearchResult]:
172 """ Get detailed information about a place in the database.
174 return self._loop.run_until_complete(self._async_api.lookup(place, details))