]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/core.py
add SQLAlchemy table schema for Nominatim tables
[nominatim.git] / nominatim / api / core.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of classes for API access via libraries.
9 """
10 from typing import Mapping, Optional, Any, AsyncIterator
11 import asyncio
12 import contextlib
13 from pathlib import Path
14
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
17 import asyncpg
18
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.config import Configuration
21 from nominatim.api.status import get_status, StatusResult
22 from nominatim.api.connection import SearchConnection
23
24 class NominatimAPIAsync:
25     """ API loader asynchornous version.
26     """
27     def __init__(self, project_dir: Path,
28                  environ: Optional[Mapping[str, str]] = None) -> None:
29         self.config = Configuration(project_dir, environ)
30         self.server_version = 0
31
32         self._engine_lock = asyncio.Lock()
33         self._engine: Optional[sa_asyncio.AsyncEngine] = None
34         self._tables: Optional[SearchTables] = None
35
36
37     async def setup_database(self) -> None:
38         """ Set up the engine and connection parameters.
39
40             This function will be implicitly called when the database is
41             accessed for the first time. You may also call it explicitly to
42             avoid that the first call is delayed by the setup.
43         """
44         async with self._engine_lock:
45             if self._engine:
46                 return
47
48             dsn = self.config.get_database_params()
49
50             dburl = sa.engine.URL.create(
51                        'postgresql+asyncpg',
52                        database=dsn.get('dbname'),
53                        username=dsn.get('user'), password=dsn.get('password'),
54                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
55                        query={k: v for k, v in dsn.items()
56                               if k not in ('user', 'password', 'dbname', 'host', 'port')})
57             engine = sa_asyncio.create_async_engine(
58                              dburl, future=True,
59                              connect_args={'server_settings': {
60                                 'DateStyle': 'sql,european',
61                                 'max_parallel_workers_per_gather': '0'
62                              }})
63
64             try:
65                 async with engine.begin() as conn:
66                     result = await conn.scalar(sa.text('SHOW server_version_num'))
67                     self.server_version = int(result)
68             except asyncpg.PostgresError:
69                 self.server_version = 0
70
71             if self.server_version >= 110000:
72                 @sa.event.listens_for(engine.sync_engine, "connect") # type: ignore[misc]
73                 def _on_connect(dbapi_con: Any, _: Any) -> None:
74                     cursor = dbapi_con.cursor()
75                     cursor.execute("SET jit_above_cost TO '-1'")
76                 # Make sure that all connections get the new settings
77                 await self.close()
78
79             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
80             self._engine = engine
81
82
83     async def close(self) -> None:
84         """ Close all active connections to the database. The NominatimAPIAsync
85             object remains usable after closing. If a new API functions is
86             called, new connections are created.
87         """
88         if self._engine is not None:
89             await self._engine.dispose()
90
91
92     @contextlib.asynccontextmanager
93     async def begin(self) -> AsyncIterator[SearchConnection]:
94         """ Create a new connection with automatic transaction handling.
95
96             This function may be used to get low-level access to the database.
97             Refer to the documentation of SQLAlchemy for details how to use
98             the connection object.
99         """
100         if self._engine is None:
101             await self.setup_database()
102
103         assert self._engine is not None
104         assert self._tables is not None
105
106         async with self._engine.begin() as conn:
107             yield SearchConnection(conn, self._tables)
108
109
110     async def status(self) -> StatusResult:
111         """ Return the status of the database.
112         """
113         try:
114             async with self.begin() as conn:
115                 status = await get_status(conn)
116         except asyncpg.PostgresError:
117             return StatusResult(700, 'Database connection failed')
118
119         return status
120
121
122 class NominatimAPI:
123     """ API loader, synchronous version.
124     """
125
126     def __init__(self, project_dir: Path,
127                  environ: Optional[Mapping[str, str]] = None) -> None:
128         self._loop = asyncio.new_event_loop()
129         self._async_api = NominatimAPIAsync(project_dir, environ)
130
131
132     def close(self) -> None:
133         """ Close all active connections to the database. The NominatimAPIAsync
134             object remains usable after closing. If a new API functions is
135             called, new connections are created.
136         """
137         self._loop.run_until_complete(self._async_api.close())
138         self._loop.close()
139
140
141     def status(self) -> StatusResult:
142         """ Return the status of the database.
143         """
144         return self._loop.run_until_complete(self._async_api.status())