# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
-Classes and function releated to status call.
+Classes and function related to status call.
"""
-from typing import Optional, cast
+from typing import Optional
import datetime as dt
+import dataclasses
import sqlalchemy as sa
-from sqlalchemy.ext.asyncio.engine import AsyncConnection
-import asyncpg
+from nominatim.api.connection import SearchConnection
from nominatim import version
+@dataclasses.dataclass
class StatusResult:
""" Result of a call to the status API.
"""
+ status: int
+ message: str
+ software_version = version.NOMINATIM_VERSION
+ data_updated: Optional[dt.datetime] = None
+ database_version: Optional[version.NominatimVersion] = None
- def __init__(self, status: int, msg: str):
- self.status = status
- self.message = msg
- self.software_version = version.NOMINATIM_VERSION
- self.data_updated: Optional[dt.datetime] = None
- self.database_version: Optional[version.NominatimVersion] = None
-
-async def _get_database_date(conn: AsyncConnection) -> Optional[dt.datetime]:
- """ Query the database date.
+async def get_status(conn: SearchConnection) -> StatusResult:
+ """ Execute a status API call.
"""
- sql = sa.text('SELECT lastimportdate FROM import_status LIMIT 1')
- result = await conn.execute(sql)
-
- for row in result:
- return cast(dt.datetime, row[0])
-
- return None
-
-
-async def _get_database_version(conn: AsyncConnection) -> Optional[version.NominatimVersion]:
- sql = sa.text("""SELECT value FROM nominatim_properties
- WHERE property = 'database_version'""")
- result = await conn.execute(sql)
-
- for row in result:
- return version.parse_version(cast(str, row[0]))
+ status = StatusResult(0, 'OK')
- return None
+ # Last update date
+ sql = sa.select(conn.t.import_status.c.lastimportdate).limit(1)
+ status.data_updated = await conn.scalar(sql)
+ if status.data_updated is not None:
+ if status.data_updated.tzinfo is None:
+ status.data_updated = status.data_updated.replace(tzinfo=dt.timezone.utc)
+ else:
+ status.data_updated = status.data_updated.astimezone(dt.timezone.utc)
-async def get_status(conn: AsyncConnection) -> StatusResult:
- """ Execute a status API call.
- """
- status = StatusResult(0, 'OK')
+ # Database version
try:
- status.data_updated = await _get_database_date(conn)
- status.database_version = await _get_database_version(conn)
- except asyncpg.PostgresError:
- return StatusResult(700, 'Database connection failed')
+ verstr = await conn.get_property('database_version')
+ status.database_version = version.parse_version(verstr)
+ except ValueError:
+ pass
return status