from sqlalchemy.ext.asyncio import AsyncConnection
from nominatim.db.sqlalchemy_schema import SearchTables
+from nominatim.api.logging import log
class SearchConnection:
""" An extended SQLAlchemy connection class, that also contains
) -> Any:
""" Execute a 'scalar()' query on the connection.
"""
+ log().sql(self.connection, sql)
return await self.connection.scalar(sql, params)
- async def execute(self, sql: sa.sql.base.Executable,
+ async def execute(self, sql: 'sa.Executable',
params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None] = None
- ) -> 'sa.engine.Result[Any]':
+ ) -> 'sa.Result[Any]':
""" Execute a 'execute()' query on the connection.
"""
+ log().sql(self.connection, sql)
return await self.connection.execute(sql, params)
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Functions for specialised logging with HTML output.
+"""
+from typing import Any, Optional, cast
+from contextvars import ContextVar
+import io
+
+import sqlalchemy as sa
+from sqlalchemy.ext.asyncio import AsyncConnection
+
+try:
+ from pygments import highlight
+ from pygments.lexers import PythonLexer, PostgresLexer
+ from pygments.formatters import HtmlFormatter
+ CODE_HIGHLIGHT = True
+except ModuleNotFoundError:
+ CODE_HIGHLIGHT = False
+
+
+class BaseLogger:
+ """ Interface for logging function.
+
+ The base implementation does nothing. Overwrite the functions
+ in derived classes which implement logging functionality.
+ """
+ def get_buffer(self) -> str:
+ return ''
+
+ def function(self, func: str, **kwargs: Any) -> None:
+ """ Start a new debug chapter for the given function and its parameters.
+ """
+
+
+ def section(self, heading: str) -> None:
+ """ Start a new section with the given title.
+ """
+
+
+ def comment(self, text: str) -> None:
+ """ Add a simple comment to the debug output.
+ """
+
+
+ def var_dump(self, heading: str, var: Any) -> None:
+ """ Print the content of the variable to the debug output prefixed by
+ the given heading.
+ """
+
+
+ def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
+ """ Print the SQL for the given statement.
+ """
+
+
+class HTMLLogger(BaseLogger):
+ """ Logger that formats messages in HTML.
+ """
+ def __init__(self):
+ self.buffer = io.StringIO()
+
+
+ def get_buffer(self) -> str:
+ return HTML_HEADER + self.buffer.getvalue() + HTML_FOOTER
+
+ def function(self, func: str, **kwargs: Any) -> None:
+ """ Start a new debug chapter for the given function and its parameters.
+ """
+ self._write(f"<h1>Debug output for {func}()</h1>\n<p>Parameters:<dl>")
+ for name, value in kwargs.items():
+ self._write(f'<dt>{name}</dt><dd>{self._python_var(value)}</dd>')
+ self._write('</dl></p>')
+
+
+ def section(self, heading: str) -> None:
+ """ Start a new section with the given title.
+ """
+ self._write(f"<h2>{heading}</h2>")
+
+ def comment(self, text: str) -> None:
+ """ Add a simple comment to the debug output.
+ """
+ self._write(f"<p>{text}</p>")
+
+ def var_dump(self, heading: str, var: Any) -> None:
+ """ Print the content of the variable to the debug output prefixed by
+ the given heading.
+ """
+ self._write(f'<h5>{heading}</h5>{self._python_var(var)}')
+
+
+ def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
+ """ Dump the SQL statement to debug output.
+ """
+ sqlstr = str(cast('sa.ClauseElement', statement)
+ .compile(conn.sync_engine, compile_kwargs={"literal_binds": True}))
+ if CODE_HIGHLIGHT:
+ sqlstr = highlight(sqlstr, PostgresLexer(),
+ HtmlFormatter(nowrap=True, lineseparator='<br>'))
+ self._write(f'<div class="highlight"><code class="lang-sql">{sqlstr}</code></div>')
+ else:
+ self._write(f'<code class="lang-sql">{sqlstr}</code>')
+
+
+ def _python_var(self, var: Any) -> str:
+ if CODE_HIGHLIGHT:
+ fmt = highlight(repr(var), PythonLexer(), HtmlFormatter(nowrap=True))
+ return f'<div class="highlight"><code class="lang-python">{fmt}</code></div>'
+
+ return f'<code class="lang-python">{str(var)}</code>'
+
+
+ def _write(self, text: str) -> None:
+ """ Add the raw text to the debug output.
+ """
+ self.buffer.write(text)
+
+
+logger: ContextVar[BaseLogger] = ContextVar('logger', default=BaseLogger())
+
+
+def set_log_output(fmt: str) -> None:
+ """ Enable collecting debug information.
+ """
+ if fmt == 'html':
+ logger.set(HTMLLogger())
+ elif fmt == 'text':
+ logger.set(TextLogger())
+ else:
+ logger.set(BaseLogger())
+
+
+def log() -> BaseLogger:
+ """ Return the logger for the current context.
+ """
+ return logger.get()
+
+
+def get_and_disable() -> str:
+ """ Return the current content of the debug buffer and disable logging.
+ """
+ buf = logger.get().get_buffer()
+ logger.set(BaseLogger())
+ return buf
+
+
+HTML_HEADER: str = """<!DOCTYPE html>
+<html>
+<head>
+ <title>Nominatim - Debug</title>
+ <style>
+""" + \
+(HtmlFormatter(nobackground=True).get_style_defs('.highlight') if CODE_HIGHLIGHT else '') +\
+"""
+ h2 { font-size: x-large }
+
+ dl {
+ padding-left: 10pt;
+ font-family: monospace
+ }
+
+ dt {
+ float: left;
+ font-weight: bold;
+ margin-right: 0.5em
+ }
+
+ dt::after { content: ": "; }
+
+ dd::after {
+ clear: left;
+ display: block
+ }
+
+ .lang-sql {
+ color: #555;
+ font-size: small
+ }
+
+ h5 {
+ border: solid lightgrey 0.1pt;
+ margin-bottom: 0;
+ background-color: #f7f7f7
+ }
+
+ h5 + .highlight {
+ padding: 3pt;
+ border: solid lightgrey 0.1pt
+ }
+ </style>
+</head>
+<body>
+"""
+
+HTML_FOOTER: str = "</body></html>"
from nominatim.api.connection import SearchConnection
import nominatim.api.types as ntyp
import nominatim.api.results as nres
+from nominatim.api.logging import log
def _select_column_geometry(column: SaColumn,
geometry_output: ntyp.GeometryFormat) -> SaLabel:
""" Search for the given place in the placex table and return the
base information.
"""
+ log().section("Find in placex table")
t = conn.t.placex
sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_, t.c.type, t.c.admin_level,
""" Search for the given place in the osmline table and return the
base information.
"""
+ log().section("Find in interpolation table")
t = conn.t.osmline
sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
t.c.indexed_date, t.c.startnumber, t.c.endnumber,
""" Search for the given place in the table of Tiger addresses and return
the base information. Only lookup by place ID is supported.
"""
+ log().section("Find in TIGER table")
t = conn.t.tiger
sql = sa.select(t.c.place_id, t.c.parent_place_id,
t.c.startnumber, t.c.endnumber, t.c.step,
""" Search for the given place in the postcode table and return the
base information. Only lookup by place ID is supported.
"""
+ log().section("Find in postcode table")
t = conn.t.postcode
sql = sa.select(t.c.place_id, t.c.parent_place_id,
t.c.rank_search, t.c.rank_address,
details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
""" Retrieve a place with additional details from the database.
"""
+ log().function('get_place_by_id', place=place, details=details)
+
if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
raise ValueError("lookup only supports geojosn polygon output.")
row = await find_in_placex(conn, place, details)
if row is not None:
result = nres.create_from_placex_row(row)
+ log().var_dump('Result', result)
await nres.add_result_details(conn, result, details)
return result
row = await find_in_osmline(conn, place, details)
if row is not None:
result = nres.create_from_osmline_row(row)
+ log().var_dump('Result', result)
await nres.add_result_details(conn, result, details)
return result
row = await find_in_postcode(conn, place, details)
if row is not None:
result = nres.create_from_postcode_row(row)
+ log().var_dump('Result', result)
await nres.add_result_details(conn, result, details)
return result
row = await find_in_tiger(conn, place, details)
if row is not None:
result = nres.create_from_tiger_row(row)
+ log().var_dump('Result', result)
await nres.add_result_details(conn, result, details)
return result
from nominatim.typing import SaSelect, SaRow
from nominatim.api.types import Point, LookupDetails
from nominatim.api.connection import SearchConnection
+from nominatim.api.logging import log
# This file defines complex result data classes.
# pylint: disable=too-many-instance-attributes
""" Retrieve more details from the database according to the
parameters specified in 'details'.
"""
+ log().section('Query details for result')
if details.address_details:
+ log().comment('Query address details')
await complete_address_details(conn, result)
if details.linked_places:
+ log().comment('Query linked places')
await complete_linked_places(conn, result)
if details.parented_places:
+ log().comment('Query parent places')
await complete_parented_places(conn, result)
if details.keywords:
+ log().comment('Query keywords')
await complete_keywords(conn, result)
from nominatim.config import Configuration
import nominatim.api as napi
+import nominatim.api.logging as loglib
from nominatim.api.v1.format import dispatch as formatting
CONTENT_TYPE = {
'text': 'text/plain; charset=utf-8',
'xml': 'text/xml; charset=utf-8',
- 'jsonp': 'application/javascript'
+ 'jsonp': 'application/javascript',
+ 'debug': 'text/html; charset=utf-8'
}
or self.config().DEFAULT_LANGUAGE
+ def setup_debugging(self) -> bool:
+ """ Set up collection of debug information if requested.
+
+ Return True when debugging was requested.
+ """
+ if self.get_bool('debug', False):
+ loglib.set_log_output('html')
+ return True
+
+ return False
+
+
def parse_format(params: ASGIAdaptor, result_type: Type[Any], default: str) -> str:
""" Get and check the 'format' parameter and prepare the formatter.
`fmtter` is a formatter and `default` the
raise params.error("Missing ID parameter 'place_id' or 'osmtype'.")
place = napi.OsmID(osmtype, params.get_int('osmid'), params.get('class'))
+ debug = params.setup_debugging()
+
details = napi.LookupDetails(address_details=params.get_bool('addressdetails', False),
linked_places=params.get_bool('linkedplaces', False),
parented_places=params.get_bool('hierarchy', False),
details.geometry_output = napi.GeometryFormat.GEOJSON
locales = napi.Locales.from_accept_languages(params.get_accepted_languages())
- print(locales.languages)
result = await api.lookup(place, details)
+ if debug:
+ return params.build_response(loglib.get_and_disable(), 'debug')
+
if result is None:
raise params.error('No place with that OSM ID found.', status=404)