1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for specialised logging with HTML output.
10 from typing import Any, Iterator, Optional, List, Tuple, cast, Union, Mapping, Sequence
11 from contextvars import ContextVar
18 import sqlalchemy as sa
19 from sqlalchemy.ext.asyncio import AsyncConnection
22 from pygments import highlight
23 from pygments.lexers import PythonLexer, PostgresLexer
24 from pygments.formatters import HtmlFormatter
26 except ModuleNotFoundError:
27 CODE_HIGHLIGHT = False
30 def _debug_name(res: Any) -> str:
32 return cast(str, res.names.get('name', next(iter(res.names.values()))))
34 return f"Hnr {res.housenumber}" if res.housenumber is not None else '[NONE]'
38 """ Interface for logging function.
40 The base implementation does nothing. Overwrite the functions
41 in derived classes which implement logging functionality.
43 def get_buffer(self) -> str:
44 """ Return the current content of the log buffer.
48 def function(self, func: str, **kwargs: Any) -> None:
49 """ Start a new debug chapter for the given function and its parameters.
53 def section(self, heading: str) -> None:
54 """ Start a new section with the given title.
58 def comment(self, text: str) -> None:
59 """ Add a simple comment to the debug output.
63 def var_dump(self, heading: str, var: Any) -> None:
64 """ Print the content of the variable to the debug output prefixed by
69 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
70 """ Print the table generated by the generator function.
74 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
75 """ Print a list of search results generated by the generator function.
79 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
80 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
81 """ Print the SQL for the given statement.
84 def format_sql(self, conn: AsyncConnection, statement: 'sa.Executable',
85 extra_params: Union[Mapping[str, Any],
86 Sequence[Mapping[str, Any]], None]) -> str:
87 """ Return the compiled version of the statement.
89 compiled = cast('sa.ClauseElement', statement).compile(conn.sync_engine)
91 params = dict(compiled.params)
92 if isinstance(extra_params, Mapping):
93 for k, v in extra_params.items():
94 if hasattr(v, 'to_wkt'):
95 params[k] = v.to_wkt()
96 elif isinstance(v, (int, float)):
100 elif isinstance(extra_params, Sequence) and extra_params:
101 for k in extra_params[0]:
104 sqlstr = str(compiled)
106 if conn.dialect.name == 'postgresql':
107 if sa.__version__.startswith('1'):
109 sqlstr = re.sub(r'__\[POSTCOMPILE_[^]]*\]', '%s', sqlstr)
110 return sqlstr % tuple((repr(params.get(name, None))
111 for name in compiled.positiontup)) # type: ignore
115 # Fixes an odd issue with Python 3.7 where percentages are not
117 sqlstr = re.sub(r'%(?!\()', '%%', sqlstr)
118 sqlstr = re.sub(r'__\[POSTCOMPILE_([^]]*)\]', r'%(\1)s', sqlstr)
119 return sqlstr % params
121 assert conn.dialect.name == 'sqlite'
123 # params in positional order
124 pparams = (repr(params.get(name, None)) for name in compiled.positiontup) # type: ignore
126 sqlstr = re.sub(r'__\[POSTCOMPILE_([^]]*)\]', '?', sqlstr)
127 sqlstr = re.sub(r"\?", lambda m: next(pparams), sqlstr)
131 class HTMLLogger(BaseLogger):
132 """ Logger that formats messages in HTML.
134 def __init__(self) -> None:
135 self.buffer = io.StringIO()
138 def _timestamp(self) -> None:
139 self._write(f'<p class="timestamp">[{dt.datetime.now()}]</p>')
142 def get_buffer(self) -> str:
143 return HTML_HEADER + self.buffer.getvalue() + HTML_FOOTER
146 def function(self, func: str, **kwargs: Any) -> None:
148 self._write(f"<h1>Debug output for {func}()</h1>\n<p>Parameters:<dl>")
149 for name, value in kwargs.items():
150 self._write(f'<dt>{name}</dt><dd>{self._python_var(value)}</dd>')
151 self._write('</dl></p>')
154 def section(self, heading: str) -> None:
156 self._write(f"<h2>{heading}</h2>")
159 def comment(self, text: str) -> None:
161 self._write(f"<p>{text}</p>")
164 def var_dump(self, heading: str, var: Any) -> None:
169 self._write(f'<h5>{heading}</h5>{self._python_var(var)}')
172 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
176 self._write(f'<table><thead><tr><th colspan="{len(head)}">{heading}</th></tr><tr>')
178 self._write(f'<th>{cell}</th>')
179 self._write('</tr></thead><tbody>')
184 self._write(f'<td>{cell}</td>')
186 self._write('</tbody></table>')
189 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
190 """ Print a list of search results generated by the generator function.
193 def format_osm(osm_object: Optional[Tuple[str, int]]) -> str:
207 return f'<a href="https://www.openstreetmap.org/{fullt}/{i}">{t}{i}</a>'
209 self._write(f'<h5>{heading}</h5><p><dl>')
211 for rank, res in results:
212 self._write(f'<dt>[{rank:.3f}]</dt> <dd>{res.source_table.name}(')
213 self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
214 self._write(f"rank={res.rank_address}, ")
215 self._write(f"osm={format_osm(res.osm_object)}, ")
216 self._write(f'cc={res.country_code}, ')
217 self._write(f'importance={res.importance or float("nan"):.5f})</dd>')
219 self._write(f'</dl><b>TOTAL:</b> {total}</p>')
222 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
223 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
225 sqlstr = self.format_sql(conn, statement, params)
227 sqlstr = highlight(sqlstr, PostgresLexer(),
228 HtmlFormatter(nowrap=True, lineseparator='<br />'))
229 self._write(f'<div class="highlight"><code class="lang-sql">{sqlstr}</code></div>')
231 self._write(f'<code class="lang-sql">{html.escape(sqlstr)}</code>')
234 def _python_var(self, var: Any) -> str:
236 fmt = highlight(str(var), PythonLexer(), HtmlFormatter(nowrap=True))
237 return f'<div class="highlight"><code class="lang-python">{fmt}</code></div>'
239 return f'<code class="lang-python">{html.escape(str(var))}</code>'
242 def _write(self, text: str) -> None:
243 """ Add the raw text to the debug output.
245 self.buffer.write(text)
248 class TextLogger(BaseLogger):
249 """ Logger creating output suitable for the console.
251 def __init__(self) -> None:
252 self.buffer = io.StringIO()
255 def _timestamp(self) -> None:
256 self._write(f'[{dt.datetime.now()}]\n')
259 def get_buffer(self) -> str:
260 return self.buffer.getvalue()
263 def function(self, func: str, **kwargs: Any) -> None:
264 self._write(f"#### Debug output for {func}()\n\nParameters:\n")
265 for name, value in kwargs.items():
266 self._write(f' {name}: {self._python_var(value)}\n')
270 def section(self, heading: str) -> None:
272 self._write(f"\n# {heading}\n\n")
275 def comment(self, text: str) -> None:
276 self._write(f"{text}\n")
279 def var_dump(self, heading: str, var: Any) -> None:
283 self._write(f'{heading}:\n {self._python_var(var)}\n\n')
286 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
287 self._write(f'{heading}:\n')
288 data = [list(map(self._python_var, row)) if row else None for row in rows]
289 assert data[0] is not None
290 num_cols = len(data[0])
292 maxlens = [max(len(d[i]) for d in data if d) for i in range(num_cols)]
293 tablewidth = sum(maxlens) + 3 * num_cols + 1
294 row_format = '| ' +' | '.join(f'{{:<{l}}}' for l in maxlens) + ' |\n'
295 self._write('-'*tablewidth + '\n')
296 self._write(row_format.format(*data[0]))
297 self._write('-'*tablewidth + '\n')
300 self._write(row_format.format(*row))
302 self._write('-'*tablewidth + '\n')
304 self._write('-'*tablewidth + '\n')
307 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
309 self._write(f'{heading}:\n')
311 for rank, res in results:
312 self._write(f'[{rank:.3f}] {res.source_table.name}(')
313 self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
314 self._write(f"rank={res.rank_address}, ")
315 self._write(f"osm={''.join(map(str, res.osm_object or []))}, ")
316 self._write(f'cc={res.country_code}, ')
317 self._write(f'importance={res.importance or -1:.5f})\n')
319 self._write(f'TOTAL: {total}\n\n')
322 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
323 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
325 sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement, params), width=78))
326 self._write(f"| {sqlstr}\n\n")
329 def _python_var(self, var: Any) -> str:
333 def _write(self, text: str) -> None:
334 self.buffer.write(text)
337 logger: ContextVar[BaseLogger] = ContextVar('logger', default=BaseLogger())
340 def set_log_output(fmt: str) -> None:
341 """ Enable collecting debug information.
344 logger.set(HTMLLogger())
346 logger.set(TextLogger())
348 logger.set(BaseLogger())
351 def log() -> BaseLogger:
352 """ Return the logger for the current context.
357 def get_and_disable() -> str:
358 """ Return the current content of the debug buffer and disable logging.
360 buf = logger.get().get_buffer()
361 logger.set(BaseLogger())
365 HTML_HEADER: str = """<!DOCTYPE html>
368 <title>Nominatim - Debug</title>
371 (HtmlFormatter(nobackground=True).get_style_defs('.highlight') if CODE_HIGHLIGHT else '') +\
373 h2 { font-size: x-large }
377 font-family: monospace
386 dt::after { content: ": "; }
399 border: solid lightgrey 0.1pt;
401 background-color: #f7f7f7
406 border: solid lightgrey 0.1pt
411 border-collapse: collapse;
414 border-right: thin solid;
422 width: calc(100% - 5pt);
433 HTML_FOOTER: str = "</body></html>"