"""
Functions for specialised logging with HTML output.
"""
-from typing import Any, Iterator, Optional, List, Tuple, cast
+from typing import Any, Iterator, Optional, List, Tuple, cast, Union, Mapping, Sequence
from contextvars import ContextVar
+import datetime as dt
import textwrap
import io
+import re
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import AsyncConnection
"""
- def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
+ def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
+ params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
""" Print the SQL for the given statement.
"""
- def format_sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> str:
+ def format_sql(self, conn: AsyncConnection, statement: 'sa.Executable',
+ extra_params: Union[Mapping[str, Any],
+ Sequence[Mapping[str, Any]], None]) -> str:
""" Return the comiled version of the statement.
"""
- try:
- return str(cast('sa.ClauseElement', statement)
- .compile(conn.sync_engine, compile_kwargs={"literal_binds": True}))
- except sa.exc.CompileError:
- pass
- except NotImplementedError:
- pass
-
- return str(cast('sa.ClauseElement', statement).compile(conn.sync_engine))
-
+ compiled = cast('sa.ClauseElement', statement).compile(conn.sync_engine)
+
+ params = dict(compiled.params)
+ if isinstance(extra_params, Mapping):
+ for k, v in extra_params.items():
+ params[k] = str(v)
+ elif isinstance(extra_params, Sequence) and extra_params:
+ for k in extra_params[0]:
+ params[k] = f':{k}'
+
+ sqlstr = str(compiled)
+
+ if sa.__version__.startswith('1'):
+ try:
+ sqlstr = re.sub(r'__\[POSTCOMPILE_[^]]*\]', '%s', sqlstr)
+ return sqlstr % tuple((repr(params.get(name, None))
+ for name in compiled.positiontup)) # type: ignore
+ except TypeError:
+ return sqlstr
+
+ # Fixes an odd issue with Python 3.7 where percentages are not
+ # quoted correctly.
+ sqlstr = re.sub(r'%(?!\()', '%%', sqlstr)
+ sqlstr = re.sub(r'__\[POSTCOMPILE_([^]]*)\]', r'%(\1)s', sqlstr)
+ print(sqlstr)
+ return sqlstr % params
class HTMLLogger(BaseLogger):
""" Logger that formats messages in HTML.
self.buffer = io.StringIO()
+ def _timestamp(self) -> None:
+ self._write(f'<p class="timestamp">[{dt.datetime.now()}]</p>')
+
+
def get_buffer(self) -> str:
return HTML_HEADER + self.buffer.getvalue() + HTML_FOOTER
def function(self, func: str, **kwargs: Any) -> None:
+ self._timestamp()
self._write(f"<h1>Debug output for {func}()</h1>\n<p>Parameters:<dl>")
for name, value in kwargs.items():
self._write(f'<dt>{name}</dt><dd>{self._python_var(value)}</dd>')
def section(self, heading: str) -> None:
+ self._timestamp()
self._write(f"<h2>{heading}</h2>")
def comment(self, text: str) -> None:
+ self._timestamp()
self._write(f"<p>{text}</p>")
def var_dump(self, heading: str, var: Any) -> None:
+ self._timestamp()
if callable(var):
var = var()
def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
+ self._timestamp()
head = next(rows)
assert head
self._write(f'<table><thead><tr><th colspan="{len(head)}">{heading}</th></tr><tr>')
def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
""" Print a list of search results generated by the generator function.
"""
+ self._timestamp()
def format_osm(osm_object: Optional[Tuple[str, int]]) -> str:
if not osm_object:
return '-'
self._write(f"rank={res.rank_address}, ")
self._write(f"osm={format_osm(res.osm_object)}, ")
self._write(f'cc={res.country_code}, ')
- self._write(f'importance={res.importance or -1:.5f})</dd>')
+ self._write(f'importance={res.importance or float("nan"):.5f})</dd>')
total += 1
self._write(f'</dl><b>TOTAL:</b> {total}</p>')
- def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
- sqlstr = self.format_sql(conn, statement)
+ def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
+ params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
+ self._timestamp()
+ sqlstr = self.format_sql(conn, statement, params)
if CODE_HIGHLIGHT:
sqlstr = highlight(sqlstr, PostgresLexer(),
HtmlFormatter(nowrap=True, lineseparator='<br />'))
def _python_var(self, var: Any) -> str:
if CODE_HIGHLIGHT:
- fmt = highlight(repr(var), PythonLexer(), HtmlFormatter(nowrap=True))
+ fmt = highlight(str(var), PythonLexer(), HtmlFormatter(nowrap=True))
return f'<div class="highlight"><code class="lang-python">{fmt}</code></div>'
return f'<code class="lang-python">{str(var)}</code>'
self._write(f'TOTAL: {total}\n\n')
- def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
- sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement), width=78))
+ def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
+ params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
+ sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement, params), width=78))
self._write(f"| {sqlstr}\n\n")
padding: 3pt;
border: solid lightgrey 0.1pt
}
+
+ table, th, tbody {
+ border: thin solid;
+ border-collapse: collapse;
+ }
+ td {
+ border-right: thin solid;
+ padding-left: 3pt;
+ padding-right: 3pt;
+ }
+
+ .timestamp {
+ font-size: 0.8em;
+ color: darkblue;
+ width: calc(100% - 5pt);
+ text-align: right;
+ position: absolute;
+ left: 0;
+ margin-top: -5px;
+ }
</style>
</head>
<body>