]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/connection.py
replace CASE construct with plpgsql function
[nominatim.git] / nominatim / api / connection.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Extended SQLAlchemy connection class that also includes access to the schema.
9 """
10 from typing import cast, Any, Mapping, Sequence, Union, Dict, Optional, Set
11
12 import sqlalchemy as sa
13 from sqlalchemy.ext.asyncio import AsyncConnection
14
15 from nominatim.typing import SaFromClause
16 from nominatim.db.sqlalchemy_schema import SearchTables
17 from nominatim.db.sqlalchemy_types import Geometry
18 from nominatim.api.logging import log
19
20 class SearchConnection:
21     """ An extended SQLAlchemy connection class, that also contains
22         then table definitions. The underlying asynchronous SQLAlchemy
23         connection can be accessed with the 'connection' property.
24         The 't' property is the collection of Nominatim tables.
25     """
26
27     def __init__(self, conn: AsyncConnection,
28                  tables: SearchTables,
29                  properties: Dict[str, Any]) -> None:
30         self.connection = conn
31         self.t = tables # pylint: disable=invalid-name
32         self._property_cache = properties
33         self._classtables: Optional[Set[str]] = None
34
35
36     async def scalar(self, sql: sa.sql.base.Executable,
37                      params: Union[Mapping[str, Any], None] = None
38                     ) -> Any:
39         """ Execute a 'scalar()' query on the connection.
40         """
41         log().sql(self.connection, sql, params)
42         return await self.connection.scalar(sql, params)
43
44
45     async def execute(self, sql: 'sa.Executable',
46                       params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None] = None
47                      ) -> 'sa.Result[Any]':
48         """ Execute a 'execute()' query on the connection.
49         """
50         log().sql(self.connection, sql, params)
51         return await self.connection.execute(sql, params)
52
53
54     async def get_property(self, name: str, cached: bool = True) -> str:
55         """ Get a property from Nominatim's property table.
56
57             Property values are normally cached so that they are only
58             retrieved from the database when they are queried for the
59             first time with this function. Set 'cached' to False to force
60             reading the property from the database.
61
62             Raises a ValueError if the property does not exist.
63         """
64         if name.startswith('DB:'):
65             raise ValueError(f"Illegal property value '{name}'.")
66
67         if cached and name in self._property_cache:
68             return cast(str, self._property_cache[name])
69
70         sql = sa.select(self.t.properties.c.value)\
71             .where(self.t.properties.c.property == name)
72         value = await self.connection.scalar(sql)
73
74         if value is None:
75             raise ValueError(f"Property '{name}' not found in database.")
76
77         self._property_cache[name] = cast(str, value)
78
79         return cast(str, value)
80
81
82     async def get_db_property(self, name: str) -> Any:
83         """ Get a setting from the database. At the moment, only
84             'server_version', the version of the database software, can
85             be retrieved with this function.
86
87             Raises a ValueError if the property does not exist.
88         """
89         if name != 'server_version':
90             raise ValueError(f"DB setting '{name}' not found in database.")
91
92         return self._property_cache['DB:server_version']
93
94
95     async def get_class_table(self, cls: str, typ: str) -> Optional[SaFromClause]:
96         """ Lookup up if there is a classtype table for the given category
97             and return a SQLAlchemy table for it, if it exists.
98         """
99         if self._classtables is None:
100             res = await self.execute(sa.text("""SELECT tablename FROM pg_tables
101                                                 WHERE tablename LIKE 'place_classtype_%'
102                                              """))
103             self._classtables = {r[0] for r in res}
104
105         tablename = f"place_classtype_{cls}_{typ}"
106
107         if tablename not in self._classtables:
108             return None
109
110         if tablename in self.t.meta.tables:
111             return self.t.meta.tables[tablename]
112
113         return sa.Table(tablename, self.t.meta,
114                         sa.Column('place_id', sa.BigInteger),
115                         sa.Column('centroid', Geometry))