"""
Extended SQLAlchemy connection class that also includes access to the schema.
"""
-from typing import Any, Mapping, Sequence, Union, Dict, cast
+from typing import cast, Any, Mapping, Sequence, Union, Dict, Optional, Set
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import AsyncConnection
+from nominatim.typing import SaFromClause
from nominatim.db.sqlalchemy_schema import SearchTables
+from nominatim.db.sqlalchemy_types import Geometry
from nominatim.api.logging import log
class SearchConnection:
self.connection = conn
self.t = tables # pylint: disable=invalid-name
self._property_cache = properties
+ self._classtables: Optional[Set[str]] = None
async def scalar(self, sql: sa.sql.base.Executable,
) -> Any:
""" Execute a 'scalar()' query on the connection.
"""
- log().sql(self.connection, sql)
+ log().sql(self.connection, sql, params)
return await self.connection.scalar(sql, params)
) -> 'sa.Result[Any]':
""" Execute a 'execute()' query on the connection.
"""
- log().sql(self.connection, sql)
+ log().sql(self.connection, sql, params)
return await self.connection.execute(sql, params)
raise ValueError(f"DB setting '{name}' not found in database.")
return self._property_cache['DB:server_version']
+
+
+ async def get_class_table(self, cls: str, typ: str) -> Optional[SaFromClause]:
+ """ Lookup up if there is a classtype table for the given category
+ and return a SQLAlchemy table for it, if it exists.
+ """
+ if self._classtables is None:
+ res = await self.execute(sa.text("""SELECT tablename FROM pg_tables
+ WHERE tablename LIKE 'place_classtype_%'
+ """))
+ self._classtables = {r[0] for r in res}
+
+ tablename = f"place_classtype_{cls}_{typ}"
+
+ if tablename not in self._classtables:
+ return None
+
+ if tablename in self.t.meta.tables:
+ return self.t.meta.tables[tablename]
+
+ return sa.Table(tablename, self.t.meta,
+ sa.Column('place_id', sa.BigInteger),
+ sa.Column('centroid', Geometry))