1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict
13 from pathlib import Path
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.config import Configuration
21 from nominatim.api.status import get_status, StatusResult
22 from nominatim.api.connection import SearchConnection
24 class NominatimAPIAsync:
25 """ API loader asynchornous version.
27 def __init__(self, project_dir: Path,
28 environ: Optional[Mapping[str, str]] = None) -> None:
29 self.config = Configuration(project_dir, environ)
30 self.server_version = 0
32 self._engine_lock = asyncio.Lock()
33 self._engine: Optional[sa_asyncio.AsyncEngine] = None
34 self._tables: Optional[SearchTables] = None
35 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
38 async def setup_database(self) -> None:
39 """ Set up the engine and connection parameters.
41 This function will be implicitly called when the database is
42 accessed for the first time. You may also call it explicitly to
43 avoid that the first call is delayed by the setup.
45 async with self._engine_lock:
49 dsn = self.config.get_database_params()
51 dburl = sa.engine.URL.create(
53 database=dsn.get('dbname'),
54 username=dsn.get('user'), password=dsn.get('password'),
55 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
56 query={k: v for k, v in dsn.items()
57 if k not in ('user', 'password', 'dbname', 'host', 'port')})
58 engine = sa_asyncio.create_async_engine(
60 connect_args={'server_settings': {
61 'DateStyle': 'sql,european',
62 'max_parallel_workers_per_gather': '0'
66 async with engine.begin() as conn:
67 result = await conn.scalar(sa.text('SHOW server_version_num'))
68 server_version = int(result)
69 except asyncpg.PostgresError:
72 if server_version >= 110000:
73 @sa.event.listens_for(engine.sync_engine, "connect")
74 def _on_connect(dbapi_con: Any, _: Any) -> None:
75 cursor = dbapi_con.cursor()
76 cursor.execute("SET jit_above_cost TO '-1'")
77 # Make sure that all connections get the new settings
80 self._property_cache['DB:server_version'] = server_version
82 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
86 async def close(self) -> None:
87 """ Close all active connections to the database. The NominatimAPIAsync
88 object remains usable after closing. If a new API functions is
89 called, new connections are created.
91 if self._engine is not None:
92 await self._engine.dispose()
95 @contextlib.asynccontextmanager
96 async def begin(self) -> AsyncIterator[SearchConnection]:
97 """ Create a new connection with automatic transaction handling.
99 This function may be used to get low-level access to the database.
100 Refer to the documentation of SQLAlchemy for details how to use
101 the connection object.
103 if self._engine is None:
104 await self.setup_database()
106 assert self._engine is not None
107 assert self._tables is not None
109 async with self._engine.begin() as conn:
110 yield SearchConnection(conn, self._tables, self._property_cache)
113 async def status(self) -> StatusResult:
114 """ Return the status of the database.
117 async with self.begin() as conn:
118 status = await get_status(conn)
119 except asyncpg.PostgresError:
120 return StatusResult(700, 'Database connection failed')
126 """ API loader, synchronous version.
129 def __init__(self, project_dir: Path,
130 environ: Optional[Mapping[str, str]] = None) -> None:
131 self._loop = asyncio.new_event_loop()
132 self._async_api = NominatimAPIAsync(project_dir, environ)
135 def close(self) -> None:
136 """ Close all active connections to the database. The NominatimAPIAsync
137 object remains usable after closing. If a new API functions is
138 called, new connections are created.
140 self._loop.run_until_complete(self._async_api.close())
144 def status(self) -> StatusResult:
145 """ Return the status of the database.
147 return self._loop.run_until_complete(self._async_api.status())