1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for specialised logging with HTML output.
10 from typing import Any, Iterator, Optional, List, Tuple, cast, Union, Mapping, Sequence
11 from contextvars import ContextVar
16 import sqlalchemy as sa
17 from sqlalchemy.ext.asyncio import AsyncConnection
20 from pygments import highlight
21 from pygments.lexers import PythonLexer, PostgresLexer
22 from pygments.formatters import HtmlFormatter
24 except ModuleNotFoundError:
25 CODE_HIGHLIGHT = False
28 def _debug_name(res: Any) -> str:
30 return cast(str, res.names.get('name', next(iter(res.names.values()))))
32 return f"Hnr {res.housenumber}" if res.housenumber is not None else '[NONE]'
36 """ Interface for logging function.
38 The base implementation does nothing. Overwrite the functions
39 in derived classes which implement logging functionality.
41 def get_buffer(self) -> str:
42 """ Return the current content of the log buffer.
46 def function(self, func: str, **kwargs: Any) -> None:
47 """ Start a new debug chapter for the given function and its parameters.
51 def section(self, heading: str) -> None:
52 """ Start a new section with the given title.
56 def comment(self, text: str) -> None:
57 """ Add a simple comment to the debug output.
61 def var_dump(self, heading: str, var: Any) -> None:
62 """ Print the content of the variable to the debug output prefixed by
67 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
68 """ Print the table generated by the generator function.
72 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
73 """ Print a list of search results generated by the generator function.
77 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
78 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
79 """ Print the SQL for the given statement.
82 def format_sql(self, conn: AsyncConnection, statement: 'sa.Executable',
83 extra_params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> str:
84 """ Return the comiled version of the statement.
86 compiled = cast('sa.ClauseElement', statement).compile(conn.sync_engine)
88 params = dict(compiled.params)
89 if isinstance(extra_params, Mapping):
90 for k, v in extra_params.items():
92 elif isinstance(extra_params, Sequence) and extra_params:
93 for k in extra_params[0]:
96 sqlstr = str(compiled)
100 return sqlstr % tuple((repr(compiled.params[name]) for name in compiled.positiontup))
104 return str(compiled) % params
107 class HTMLLogger(BaseLogger):
108 """ Logger that formats messages in HTML.
110 def __init__(self) -> None:
111 self.buffer = io.StringIO()
114 def _timestamp(self) -> None:
115 self._write(f'<p class="timestamp">[{dt.datetime.now()}]</p>')
118 def get_buffer(self) -> str:
119 return HTML_HEADER + self.buffer.getvalue() + HTML_FOOTER
122 def function(self, func: str, **kwargs: Any) -> None:
124 self._write(f"<h1>Debug output for {func}()</h1>\n<p>Parameters:<dl>")
125 for name, value in kwargs.items():
126 self._write(f'<dt>{name}</dt><dd>{self._python_var(value)}</dd>')
127 self._write('</dl></p>')
130 def section(self, heading: str) -> None:
132 self._write(f"<h2>{heading}</h2>")
135 def comment(self, text: str) -> None:
137 self._write(f"<p>{text}</p>")
140 def var_dump(self, heading: str, var: Any) -> None:
145 self._write(f'<h5>{heading}</h5>{self._python_var(var)}')
148 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
152 self._write(f'<table><thead><tr><th colspan="{len(head)}">{heading}</th></tr><tr>')
154 self._write(f'<th>{cell}</th>')
155 self._write('</tr></thead><tbody>')
160 self._write(f'<td>{cell}</td>')
162 self._write('</tbody></table>')
165 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
166 """ Print a list of search results generated by the generator function.
169 def format_osm(osm_object: Optional[Tuple[str, int]]) -> str:
183 return f'<a href="https://www.openstreetmap.org/{fullt}/{i}">{t}{i}</a>'
185 self._write(f'<h5>{heading}</h5><p><dl>')
187 for rank, res in results:
188 self._write(f'<dt>[{rank:.3f}]</dt> <dd>{res.source_table.name}(')
189 self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
190 self._write(f"rank={res.rank_address}, ")
191 self._write(f"osm={format_osm(res.osm_object)}, ")
192 self._write(f'cc={res.country_code}, ')
193 self._write(f'importance={res.importance or float("nan"):.5f})</dd>')
195 self._write(f'</dl><b>TOTAL:</b> {total}</p>')
198 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
199 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
201 sqlstr = self.format_sql(conn, statement, params)
203 sqlstr = highlight(sqlstr, PostgresLexer(),
204 HtmlFormatter(nowrap=True, lineseparator='<br />'))
205 self._write(f'<div class="highlight"><code class="lang-sql">{sqlstr}</code></div>')
207 self._write(f'<code class="lang-sql">{sqlstr}</code>')
210 def _python_var(self, var: Any) -> str:
212 fmt = highlight(str(var), PythonLexer(), HtmlFormatter(nowrap=True))
213 return f'<div class="highlight"><code class="lang-python">{fmt}</code></div>'
215 return f'<code class="lang-python">{str(var)}</code>'
218 def _write(self, text: str) -> None:
219 """ Add the raw text to the debug output.
221 self.buffer.write(text)
224 class TextLogger(BaseLogger):
225 """ Logger creating output suitable for the console.
227 def __init__(self) -> None:
228 self.buffer = io.StringIO()
231 def get_buffer(self) -> str:
232 return self.buffer.getvalue()
235 def function(self, func: str, **kwargs: Any) -> None:
236 self._write(f"#### Debug output for {func}()\n\nParameters:\n")
237 for name, value in kwargs.items():
238 self._write(f' {name}: {self._python_var(value)}\n')
242 def section(self, heading: str) -> None:
243 self._write(f"\n# {heading}\n\n")
246 def comment(self, text: str) -> None:
247 self._write(f"{text}\n")
250 def var_dump(self, heading: str, var: Any) -> None:
254 self._write(f'{heading}:\n {self._python_var(var)}\n\n')
257 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
258 self._write(f'{heading}:\n')
259 data = [list(map(self._python_var, row)) if row else None for row in rows]
260 assert data[0] is not None
261 num_cols = len(data[0])
263 maxlens = [max(len(d[i]) for d in data if d) for i in range(num_cols)]
264 tablewidth = sum(maxlens) + 3 * num_cols + 1
265 row_format = '| ' +' | '.join(f'{{:<{l}}}' for l in maxlens) + ' |\n'
266 self._write('-'*tablewidth + '\n')
267 self._write(row_format.format(*data[0]))
268 self._write('-'*tablewidth + '\n')
271 self._write(row_format.format(*row))
273 self._write('-'*tablewidth + '\n')
275 self._write('-'*tablewidth + '\n')
278 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
279 self._write(f'{heading}:\n')
281 for rank, res in results:
282 self._write(f'[{rank:.3f}] {res.source_table.name}(')
283 self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
284 self._write(f"rank={res.rank_address}, ")
285 self._write(f"osm={''.join(map(str, res.osm_object or []))}, ")
286 self._write(f'cc={res.country_code}, ')
287 self._write(f'importance={res.importance or -1:.5f})\n')
289 self._write(f'TOTAL: {total}\n\n')
292 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
293 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
294 sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement, params), width=78))
295 self._write(f"| {sqlstr}\n\n")
298 def _python_var(self, var: Any) -> str:
302 def _write(self, text: str) -> None:
303 self.buffer.write(text)
306 logger: ContextVar[BaseLogger] = ContextVar('logger', default=BaseLogger())
309 def set_log_output(fmt: str) -> None:
310 """ Enable collecting debug information.
313 logger.set(HTMLLogger())
315 logger.set(TextLogger())
317 logger.set(BaseLogger())
320 def log() -> BaseLogger:
321 """ Return the logger for the current context.
326 def get_and_disable() -> str:
327 """ Return the current content of the debug buffer and disable logging.
329 buf = logger.get().get_buffer()
330 logger.set(BaseLogger())
334 HTML_HEADER: str = """<!DOCTYPE html>
337 <title>Nominatim - Debug</title>
340 (HtmlFormatter(nobackground=True).get_style_defs('.highlight') if CODE_HIGHLIGHT else '') +\
342 h2 { font-size: x-large }
346 font-family: monospace
355 dt::after { content: ": "; }
368 border: solid lightgrey 0.1pt;
370 background-color: #f7f7f7
375 border: solid lightgrey 0.1pt
380 border-collapse: collapse;
383 border-right: thin solid;
391 width: calc(100% - 5pt);
402 HTML_FOOTER: str = "</body></html>"