Requires TypedDict which is only available from Python 3.8. Require
therefore typing_extensions to make the functions available for
earlier Python versions.
else
sudo apt-get install -y -qq python3-icu python3-datrie python3-pyosmium python3-jinja2 python3-psutil python3-psycopg2 python3-dotenv python3-yaml
fi
else
sudo apt-get install -y -qq python3-icu python3-datrie python3-pyosmium python3-jinja2 python3-psutil python3-psycopg2 python3-dotenv python3-yaml
fi
+ sudo pip3 install -U typing-extensions
shell: bash
env:
UBUNTUVER: ${{ inputs.ubuntu }}
shell: bash
env:
UBUNTUVER: ${{ inputs.ubuntu }}
* [PostgreSQL](https://www.postgresql.org) (9.6+ will work, 11+ strongly recommended)
* [PostGIS](https://postgis.net) (2.2+ will work, 3.0+ strongly recommended)
* [Python 3](https://www.python.org/) (3.6+)
* [PostgreSQL](https://www.postgresql.org) (9.6+ will work, 11+ strongly recommended)
* [PostGIS](https://postgis.net) (2.2+ will work, 3.0+ strongly recommended)
* [Python 3](https://www.python.org/) (3.6+)
+ * [Python Typing Extensions](https://github.com/python/typing_extensions)
* [Psycopg2](https://www.psycopg.org) (2.7+)
* [Python Dotenv](https://github.com/theskumar/python-dotenv)
* [psutil](https://github.com/giampaolo/psutil)
* [Psycopg2](https://www.psycopg.org) (2.7+)
* [Python Dotenv](https://github.com/theskumar/python-dotenv)
* [psutil](https://github.com/giampaolo/psutil)
""" Query execution that logs the SQL query when debugging is enabled.
"""
if LOG.isEnabledFor(logging.DEBUG):
""" Query execution that logs the SQL query when debugging is enabled.
"""
if LOG.isEnabledFor(logging.DEBUG):
- LOG.debug(self.mogrify(query, args).decode('utf-8')) # type: ignore
+ LOG.debug(self.mogrify(query, args).decode('utf-8')) # type: ignore[no-untyped-call]
super().execute(query, args)
super().execute(query, args)
if self.rowcount != 1:
raise RuntimeError("Query did not return a single row.")
if self.rowcount != 1:
raise RuntimeError("Query did not return a single row.")
- result = self.fetchone() # type: ignore
+ result = self.fetchone() # type: ignore[no-untyped-call]
assert result is not None
return result[0]
assert result is not None
return result[0]
if cascade:
sql += ' CASCADE'
if cascade:
sql += ' CASCADE'
- self.execute(pysql.SQL(sql).format(pysql.Identifier(name))) # type: ignore
+ self.execute(pysql.SQL(sql).format(pysql.Identifier(name))) # type: ignore[no-untyped-call]
class Connection(psycopg2.extensions.connection):
class Connection(psycopg2.extensions.connection):
return False
if table is not None:
return False
if table is not None:
- row = cur.fetchone() # type: ignore
+ row = cur.fetchone() # type: ignore[no-untyped-call]
if row is None or not isinstance(row[0], str):
return False
return row[0] == table
if row is None or not isinstance(row[0], str):
return False
return row[0] == table
"""
Access and helper functions for the status and status log table.
"""
"""
Access and helper functions for the status and status log table.
"""
+from typing import Optional, Tuple, cast
import datetime as dt
import logging
import re
import datetime as dt
import logging
import re
+from typing_extensions import TypedDict
+
+from nominatim.db.connection import Connection
from nominatim.tools.exec_utils import get_url
from nominatim.errors import UsageError
LOG = logging.getLogger()
ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
from nominatim.tools.exec_utils import get_url
from nominatim.errors import UsageError
LOG = logging.getLogger()
ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
-def compute_database_date(conn):
+
+class StatusRow(TypedDict):
+ """ Dictionary of columns of the import_status table.
+ """
+ lastimportdate: dt.datetime
+ sequence_id: Optional[int]
+ indexed: Optional[bool]
+
+
+def compute_database_date(conn: Connection) -> dt.datetime:
""" Determine the date of the database from the newest object in the
data base.
"""
""" Determine the date of the database from the newest object in the
data base.
"""
return dt.datetime.strptime(match.group(1), ISODATE_FORMAT).replace(tzinfo=dt.timezone.utc)
return dt.datetime.strptime(match.group(1), ISODATE_FORMAT).replace(tzinfo=dt.timezone.utc)
-def set_status(conn, date, seq=None, indexed=True):
+def set_status(conn: Connection, date: Optional[dt.datetime],
+ seq: Optional[int] = None, indexed: bool = True) -> None:
""" Replace the current status with the given status. If date is `None`
then only sequence and indexed will be updated as given. Otherwise
the whole status is replaced.
""" Replace the current status with the given status. If date is `None`
then only sequence and indexed will be updated as given. Otherwise
the whole status is replaced.
+ The change will be committed to the database.
"""
assert date is None or date.tzinfo == dt.timezone.utc
with conn.cursor() as cur:
"""
assert date is None or date.tzinfo == dt.timezone.utc
with conn.cursor() as cur:
+def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int], Optional[bool]]:
""" Return the current status as a triple of (date, sequence, indexed).
If status has not been set up yet, a triple of None is returned.
"""
""" Return the current status as a triple of (date, sequence, indexed).
If status has not been set up yet, a triple of None is returned.
"""
if cur.rowcount < 1:
return None, None, None
if cur.rowcount < 1:
return None, None, None
+ row = cast(StatusRow, cur.fetchone()) # type: ignore[no-untyped-call]
return row['lastimportdate'], row['sequence_id'], row['indexed']
return row['lastimportdate'], row['sequence_id'], row['indexed']
-def set_indexed(conn, state):
+def set_indexed(conn: Connection, state: bool) -> None:
""" Set the indexed flag in the status table to the given state.
"""
with conn.cursor() as cur:
""" Set the indexed flag in the status table to the given state.
"""
with conn.cursor() as cur:
-def log_status(conn, start, event, batchsize=None):
+def log_status(conn: Connection, start: dt.datetime,
+ event: str, batchsize: Optional[int] = None) -> None:
""" Write a new status line to the `import_osmosis_log` table.
"""
with conn.cursor() as cur:
""" Write a new status line to the `import_osmosis_log` table.
"""
with conn.cursor() as cur:
(batchend, batchseq, batchsize, starttime, endtime, event)
SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
(batchsize, start, event))
(batchend, batchseq, batchsize, starttime, endtime, event)
SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
(batchsize, start, event))
# Some of the Python packages that come with Ubuntu 18.04 are too old, so
# install the latest version from pip:
# Some of the Python packages that come with Ubuntu 18.04 are too old, so
# install the latest version from pip:
- pip3 install --user python-dotenv datrie pyyaml psycopg2-binary
+ pip3 install --user python-dotenv datrie pyyaml psycopg2-binary typing-extensions
postgresql-contrib-12 postgresql-12-postgis-3-scripts \
php php-pgsql php-intl libicu-dev python3-dotenv \
python3-psycopg2 python3-psutil python3-jinja2 \
postgresql-contrib-12 postgresql-12-postgis-3-scripts \
php php-pgsql php-intl libicu-dev python3-dotenv \
python3-psycopg2 python3-psutil python3-jinja2 \
- python3-icu python3-datrie python3-yaml git
+ python3-icu python3-datrie python3-yaml python3-pip git
+
+# Nominatim uses some typing features only available in later Python versions.
+# Install the latest version of the backport package:
+
+ pip3 install --user typing-extensions