]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/core.py
54f02a938e77517b2a0e23fa5819911fd0d2d434
[nominatim.git] / nominatim / api / core.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of classes for API access via libraries.
9 """
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict
11 import asyncio
12 import contextlib
13 from pathlib import Path
14
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
17 import asyncpg
18
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.config import Configuration
21 from nominatim.api.status import get_status, StatusResult
22 from nominatim.api.connection import SearchConnection
23
24 class NominatimAPIAsync:
25     """ API loader asynchornous version.
26     """
27     def __init__(self, project_dir: Path,
28                  environ: Optional[Mapping[str, str]] = None) -> None:
29         self.config = Configuration(project_dir, environ)
30         self.server_version = 0
31
32         self._engine_lock = asyncio.Lock()
33         self._engine: Optional[sa_asyncio.AsyncEngine] = None
34         self._tables: Optional[SearchTables] = None
35         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
36
37
38     async def setup_database(self) -> None:
39         """ Set up the engine and connection parameters.
40
41             This function will be implicitly called when the database is
42             accessed for the first time. You may also call it explicitly to
43             avoid that the first call is delayed by the setup.
44         """
45         async with self._engine_lock:
46             if self._engine:
47                 return
48
49             dsn = self.config.get_database_params()
50
51             dburl = sa.engine.URL.create(
52                        'postgresql+asyncpg',
53                        database=dsn.get('dbname'),
54                        username=dsn.get('user'), password=dsn.get('password'),
55                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
56                        query={k: v for k, v in dsn.items()
57                               if k not in ('user', 'password', 'dbname', 'host', 'port')})
58             engine = sa_asyncio.create_async_engine(
59                              dburl, future=True,
60                              connect_args={'server_settings': {
61                                 'DateStyle': 'sql,european',
62                                 'max_parallel_workers_per_gather': '0'
63                              }})
64
65             try:
66                 async with engine.begin() as conn:
67                     result = await conn.scalar(sa.text('SHOW server_version_num'))
68                     server_version = int(result)
69             except asyncpg.PostgresError:
70                 server_version = 0
71
72             if server_version >= 110000:
73                 @sa.event.listens_for(engine.sync_engine, "connect")
74                 def _on_connect(dbapi_con: Any, _: Any) -> None:
75                     cursor = dbapi_con.cursor()
76                     cursor.execute("SET jit_above_cost TO '-1'")
77                 # Make sure that all connections get the new settings
78                 await self.close()
79
80             self._property_cache['DB:server_version'] = server_version
81
82             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
83             self._engine = engine
84
85
86     async def close(self) -> None:
87         """ Close all active connections to the database. The NominatimAPIAsync
88             object remains usable after closing. If a new API functions is
89             called, new connections are created.
90         """
91         if self._engine is not None:
92             await self._engine.dispose()
93
94
95     @contextlib.asynccontextmanager
96     async def begin(self) -> AsyncIterator[SearchConnection]:
97         """ Create a new connection with automatic transaction handling.
98
99             This function may be used to get low-level access to the database.
100             Refer to the documentation of SQLAlchemy for details how to use
101             the connection object.
102         """
103         if self._engine is None:
104             await self.setup_database()
105
106         assert self._engine is not None
107         assert self._tables is not None
108
109         async with self._engine.begin() as conn:
110             yield SearchConnection(conn, self._tables, self._property_cache)
111
112
113     async def status(self) -> StatusResult:
114         """ Return the status of the database.
115         """
116         try:
117             async with self.begin() as conn:
118                 status = await get_status(conn)
119         except asyncpg.PostgresError:
120             return StatusResult(700, 'Database connection failed')
121
122         return status
123
124
125 class NominatimAPI:
126     """ API loader, synchronous version.
127     """
128
129     def __init__(self, project_dir: Path,
130                  environ: Optional[Mapping[str, str]] = None) -> None:
131         self._loop = asyncio.new_event_loop()
132         self._async_api = NominatimAPIAsync(project_dir, environ)
133
134
135     def close(self) -> None:
136         """ Close all active connections to the database. The NominatimAPIAsync
137             object remains usable after closing. If a new API functions is
138             called, new connections are created.
139         """
140         self._loop.run_until_complete(self._async_api.close())
141         self._loop.close()
142
143
144     def status(self) -> StatusResult:
145         """ Return the status of the database.
146         """
147         return self._loop.run_until_complete(self._async_api.status())