"""
Functions for specialised logging with HTML output.
"""
-from typing import Any, Optional, cast
+from typing import Any, Iterator, Optional, List, Tuple, cast
from contextvars import ContextVar
+import textwrap
import io
import sqlalchemy as sa
CODE_HIGHLIGHT = False
+def _debug_name(res: Any) -> str:
+ if res.names:
+ return cast(str, res.names.get('name', next(iter(res.names.values()))))
+
+ return f"Hnr {res.housenumber}" if res.housenumber is not None else '[NONE]'
+
+
class BaseLogger:
""" Interface for logging function.
in derived classes which implement logging functionality.
"""
def get_buffer(self) -> str:
+ """ Return the current content of the log buffer.
+ """
return ''
def function(self, func: str, **kwargs: Any) -> None:
"""
+ def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
+ """ Print the table generated by the generator function.
+ """
+
+
+ def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
+ """ Print a list of search results generated by the generator function.
+ """
+
+
def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
""" Print the SQL for the given statement.
"""
+ def format_sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> str:
+ """ Return the comiled version of the statement.
+ """
+ try:
+ return str(cast('sa.ClauseElement', statement)
+ .compile(conn.sync_engine, compile_kwargs={"literal_binds": True}))
+ except sa.exc.CompileError:
+ pass
+ except NotImplementedError:
+ pass
+
+ return str(cast('sa.ClauseElement', statement).compile(conn.sync_engine))
+
class HTMLLogger(BaseLogger):
""" Logger that formats messages in HTML.
"""
- def __init__(self):
+ def __init__(self) -> None:
self.buffer = io.StringIO()
def get_buffer(self) -> str:
return HTML_HEADER + self.buffer.getvalue() + HTML_FOOTER
+
def function(self, func: str, **kwargs: Any) -> None:
- """ Start a new debug chapter for the given function and its parameters.
- """
self._write(f"<h1>Debug output for {func}()</h1>\n<p>Parameters:<dl>")
for name, value in kwargs.items():
self._write(f'<dt>{name}</dt><dd>{self._python_var(value)}</dd>')
def section(self, heading: str) -> None:
- """ Start a new section with the given title.
- """
self._write(f"<h2>{heading}</h2>")
+
def comment(self, text: str) -> None:
- """ Add a simple comment to the debug output.
- """
self._write(f"<p>{text}</p>")
+
def var_dump(self, heading: str, var: Any) -> None:
- """ Print the content of the variable to the debug output prefixed by
- the given heading.
- """
+ if callable(var):
+ var = var()
+
self._write(f'<h5>{heading}</h5>{self._python_var(var)}')
- def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
- """ Dump the SQL statement to debug output.
+ def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
+ head = next(rows)
+ assert head
+ self._write(f'<table><thead><tr><th colspan="{len(head)}">{heading}</th></tr><tr>')
+ for cell in head:
+ self._write(f'<th>{cell}</th>')
+ self._write('</tr></thead><tbody>')
+ for row in rows:
+ if row is not None:
+ self._write('<tr>')
+ for cell in row:
+ self._write(f'<td>{cell}</td>')
+ self._write('</tr>')
+ self._write('</tbody></table>')
+
+
+ def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
+ """ Print a list of search results generated by the generator function.
"""
- sqlstr = str(cast('sa.ClauseElement', statement)
- .compile(conn.sync_engine, compile_kwargs={"literal_binds": True}))
+ def format_osm(osm_object: Optional[Tuple[str, int]]) -> str:
+ if not osm_object:
+ return '-'
+
+ t, i = osm_object
+ if t == 'N':
+ fullt = 'node'
+ elif t == 'W':
+ fullt = 'way'
+ elif t == 'R':
+ fullt = 'relation'
+ else:
+ return f'{t}{i}'
+
+ return f'<a href="https://www.openstreetmap.org/{fullt}/{i}">{t}{i}</a>'
+
+ self._write(f'<h5>{heading}</h5><p><dl>')
+ total = 0
+ for rank, res in results:
+ self._write(f'<dt>[{rank:.3f}]</dt> <dd>{res.source_table.name}(')
+ self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
+ self._write(f"rank={res.rank_address}, ")
+ self._write(f"osm={format_osm(res.osm_object)}, ")
+ self._write(f'cc={res.country_code}, ')
+ self._write(f'importance={res.importance or -1:.5f})</dd>')
+ total += 1
+ self._write(f'</dl><b>TOTAL:</b> {total}</p>')
+
+
+ def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
+ sqlstr = self.format_sql(conn, statement)
if CODE_HIGHLIGHT:
sqlstr = highlight(sqlstr, PostgresLexer(),
- HtmlFormatter(nowrap=True, lineseparator='<br>'))
+ HtmlFormatter(nowrap=True, lineseparator='<br />'))
self._write(f'<div class="highlight"><code class="lang-sql">{sqlstr}</code></div>')
else:
self._write(f'<code class="lang-sql">{sqlstr}</code>')
self.buffer.write(text)
+class TextLogger(BaseLogger):
+ """ Logger creating output suitable for the console.
+ """
+ def __init__(self) -> None:
+ self.buffer = io.StringIO()
+
+
+ def get_buffer(self) -> str:
+ return self.buffer.getvalue()
+
+
+ def function(self, func: str, **kwargs: Any) -> None:
+ self._write(f"#### Debug output for {func}()\n\nParameters:\n")
+ for name, value in kwargs.items():
+ self._write(f' {name}: {self._python_var(value)}\n')
+ self._write('\n')
+
+
+ def section(self, heading: str) -> None:
+ self._write(f"\n# {heading}\n\n")
+
+
+ def comment(self, text: str) -> None:
+ self._write(f"{text}\n")
+
+
+ def var_dump(self, heading: str, var: Any) -> None:
+ if callable(var):
+ var = var()
+
+ self._write(f'{heading}:\n {self._python_var(var)}\n\n')
+
+
+ def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
+ self._write(f'{heading}:\n')
+ data = [list(map(self._python_var, row)) if row else None for row in rows]
+ assert data[0] is not None
+ num_cols = len(data[0])
+
+ maxlens = [max(len(d[i]) for d in data if d) for i in range(num_cols)]
+ tablewidth = sum(maxlens) + 3 * num_cols + 1
+ row_format = '| ' +' | '.join(f'{{:<{l}}}' for l in maxlens) + ' |\n'
+ self._write('-'*tablewidth + '\n')
+ self._write(row_format.format(*data[0]))
+ self._write('-'*tablewidth + '\n')
+ for row in data[1:]:
+ if row:
+ self._write(row_format.format(*row))
+ else:
+ self._write('-'*tablewidth + '\n')
+ if data[-1]:
+ self._write('-'*tablewidth + '\n')
+
+
+ def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
+ self._write(f'{heading}:\n')
+ total = 0
+ for rank, res in results:
+ self._write(f'[{rank:.3f}] {res.source_table.name}(')
+ self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
+ self._write(f"rank={res.rank_address}, ")
+ self._write(f"osm={''.join(map(str, res.osm_object or []))}, ")
+ self._write(f'cc={res.country_code}, ')
+ self._write(f'importance={res.importance or -1:.5f})\n')
+ total += 1
+ self._write(f'TOTAL: {total}\n\n')
+
+
+ def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
+ sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement), width=78))
+ self._write(f"| {sqlstr}\n\n")
+
+
+ def _python_var(self, var: Any) -> str:
+ return str(var)
+
+
+ def _write(self, text: str) -> None:
+ self.buffer.write(text)
+
+
logger: ContextVar[BaseLogger] = ContextVar('logger', default=BaseLogger())