# Do not repeat documentation of subcommand classes.
# pylint: disable=C0111
# Using non-top-level imports to make pyosmium optional for replication only.
-# pylint: disable=E0012,C0415
+# pylint: disable=C0415
class UpdateReplication:
"""\
end = dt.datetime.now(dt.timezone.utc)
LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
round_time((start_index or end) - start_import),
- "Indexing: {} ".format(round_time(end - start_index))
- if start_index else '',
+ f"Indexing: {round_time(end - start_index)} " if start_index else '',
round_time(end - start_import),
round_time(end - batchdate))
# Do not repeat documentation of subcommand classes.
# pylint: disable=C0111
# Using non-top-level imports to avoid eventually unused imports.
-# pylint: disable=E0012,C0415
+# pylint: disable=C0415
LOG = logging.getLogger()
LOG.warning('Creating support index')
if tablespace:
tablespace = 'TABLESPACE ' + tablespace
- cur.execute("""CREATE INDEX idx_placex_pendingsector
- ON placex USING BTREE (rank_address,geometry_sector)
- {} WHERE indexed_status > 0
- """.format(tablespace))
+ cur.execute(f"""CREATE INDEX idx_placex_pendingsector
+ ON placex USING BTREE (rank_address,geometry_sector)
+ {tablespace} WHERE indexed_status > 0
+ """)
conn.commit()
style = self.__getattr__('IMPORT_STYLE')
if style in ('admin', 'street', 'address', 'full', 'extratags'):
- return self.config_dir / 'import-{}.style'.format(style)
+ return self.config_dir / f'import-{style}.style'
return self.find_config_file('', 'IMPORT_STYLE')
ctxmgr.connection = conn
return ctxmgr
except psycopg2.OperationalError as err:
- raise UsageError("Cannot connect to database: {}".format(err)) from err
+ raise UsageError(f"Cannot connect to database: {err}") from err
# Translation from PG connection string parameters to PG environment variables.
out = {}
for subset in ('ADDRESS', 'SEARCH', 'AUX'):
for kind in ('DATA', 'INDEX'):
- tspace = getattr(config, 'TABLESPACE_{}_{}'.format(subset, kind))
+ tspace = getattr(config, f'TABLESPACE_{subset}_{kind}')
if tspace:
- tspace = 'TABLESPACE "{}"'.format(tspace)
- out['{}_{}'.format(subset.lower(), kind.lower())] = tspace
+ tspace = f'TABLESPACE "{tspace}"'
+ out[f'{subset.lower()}_{kind.lower()}'] = tspace
return out
LOG.info("Using node id %d for timestamp lookup", osmid)
# Get the node from the API to find the timestamp when it was created.
- node_url = 'https://www.openstreetmap.org/api/0.6/node/{}/1'.format(osmid)
+ node_url = f'https://www.openstreetmap.org/api/0.6/node/{osmid}/1'
data = get_url(node_url)
match = re.search(r'timestamp="((\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}))Z"', data)
"""
def name(self):
- return "rank {}".format(self.rank)
+ return f"rank {self.rank}"
def sql_count_objects(self):
return pysql.SQL("""SELECT count(*) FROM placex
"""
def name(self):
- return "boundaries rank {}".format(self.rank)
+ return f"boundaries rank {self.rank}"
def sql_count_objects(self):
return pysql.SQL("""SELECT count(*) FROM placex
with conn.cursor() as cur:
try:
cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
- RETURNS text AS '{}/nominatim.so', 'transliteration'
+ RETURNS text AS %s, 'transliteration'
LANGUAGE c IMMUTABLE STRICT;
DROP FUNCTION nominatim_test_import_func(text)
- """.format(module_dir))
+ """, (f'{module_dir}/nominatim.so', ))
except psycopg2.DatabaseError as err:
LOG.fatal("Error accessing database module: %s", err)
raise UsageError("Database module cannot be accessed.") from err
php_file = self.data_dir / "tokenizer.php"
if not php_file.exists() or overwrite:
- php_file.write_text(dedent("""\
+ php_file.write_text(dedent(f"""\
<?php
- @define('CONST_Max_Word_Frequency', {0.MAX_WORD_FREQUENCY});
- @define('CONST_Term_Normalization_Rules', "{0.TERM_NORMALIZATION}");
- require_once('{0.lib_dir.php}/tokenizer/legacy_tokenizer.php');
- """.format(config)), encoding='utf-8')
+ @define('CONST_Max_Word_Frequency', {config.MAX_WORD_FREQUENCY});
+ @define('CONST_Term_Normalization_Rules', "{config.TERM_NORMALIZATION}");
+ require_once('{config.lib_dir.php}/tokenizer/legacy_tokenizer.php');
+ """), encoding='utf-8')
def _init_db_tables(self, config):
"""
parts = re.split(r'(\|)?([=-])>', rule)
if len(parts) != 4:
- raise UsageError("Syntax error in variant rule: " + rule)
+ raise UsageError(f"Syntax error in variant rule: {rule}")
decompose = parts[1] is None
src_terms = [self._parse_variant_word(t) for t in parts[0].split(',')]
name = name.strip()
match = re.fullmatch(r'([~^]?)([^~$^]*)([~$]?)', name)
if match is None or (match.group(1) == '~' and match.group(3) == '~'):
- raise UsageError("Invalid variant word descriptor '{}'".format(name))
+ raise UsageError(f"Invalid variant word descriptor '{name}'")
norm_name = self.norm.transliterate(match.group(2)).strip()
if not norm_name:
return None
bad_indices = [row[0] for row in list(cur)]
for idx in bad_indices:
LOG.info("Drop invalid index %s.", idx)
- cur.execute('DROP INDEX "{}"'.format(idx))
+ cur.execute(pysql.SQL('DROP INDEX {}').format(pysql.Identifier(idx)))
conn.commit()
sql = SQLPreprocessor(conn, config)
query_string = urlencode(params or {})
env = dict(QUERY_STRING=query_string,
- SCRIPT_NAME='/{}.php'.format(endpoint),
- REQUEST_URI='/{}.php?{}'.format(endpoint, query_string),
+ SCRIPT_NAME=f'/{endpoint}.php',
+ REQUEST_URI=f'/{endpoint}.php?{query_string}',
CONTEXT_DOCUMENT_ROOT=webdir,
- SCRIPT_FILENAME='{}/{}.php'.format(webdir, endpoint),
+ SCRIPT_FILENAME=f'{webdir}/{endpoint}.php',
HTTP_HOST='localhost',
HTTP_USER_AGENT='nominatim-tool',
REMOTE_ADDR='0.0.0.0',
"""
import logging
+from psycopg2 import sql as pysql
+
from nominatim.db import properties
from nominatim.db.connection import connect
from nominatim.version import NOMINATIM_VERSION, version_str
for version, func in _MIGRATION_FUNCTIONS:
if db_version <= version:
LOG.warning("Runnning: %s (%s)", func.__doc__.split('\n', 1)[0],
- '{}.{}.{}-{}'.format(*version))
+ version_str(version))
kwargs = dict(conn=conn, config=config, paths=paths)
func(**kwargs)
conn.commit()
"""
if not conn.table_exists('nominatim_properties'):
with conn.cursor() as cur:
- cur.execute("""CREATE TABLE nominatim_properties (
- property TEXT,
- value TEXT);
- GRANT SELECT ON TABLE nominatim_properties TO "{}";
- """.format(config.DATABASE_WEBUSER))
+ cur.execute(pysql.SQL("""CREATE TABLE nominatim_properties (
+ property TEXT,
+ value TEXT);
+ GRANT SELECT ON TABLE nominatim_properties TO {};
+ """)
+ .format(pysql.Identifier(config.DATABASE_WEBUSER)))
@_migration(3, 6, 0, 0)
def change_housenumber_transliteration(conn, **_):
and column_name = 'token_info'""",
(table, ))
if has_column == 0:
- cur.execute('ALTER TABLE {} ADD COLUMN token_info JSONB'.format(table))
+ cur.execute(pysql.SQL('ALTER TABLE {} ADD COLUMN token_info JSONB')
+ .format(pysql.Identifier(table)))
tokenizer = tokenizer_factory.create_tokenizer(config, init_db=False,
module_name='legacy')
def _open_external(self, project_dir):
- fname = project_dir / '{}_postcodes.csv'.format(self.country)
+ fname = project_dir / f'{self.country}_postcodes.csv'
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
return open(fname, 'r')
- fname = project_dir / '{}_postcodes.csv.gz'.format(self.country)
+ fname = project_dir / f'{self.country}_postcodes.csv.gz'
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
with conn.cursor() as cur:
cur.drop_table(table)
- cur.execute("""CREATE TABLE {} (country_code varchar(2),
+ cur.execute(pysql.SQL("""CREATE TABLE {} (
+ country_code varchar(2),
class TEXT,
type TEXT,
rank_search SMALLINT,
- rank_address SMALLINT)""".format(table))
+ rank_address SMALLINT)""")
+ .format(pysql.Identifier(table)))
cur.execute_values(pysql.SQL("INSERT INTO {} VALUES %s")
.format(pysql.Identifier(table)), rows)
- cur.execute('CREATE UNIQUE INDEX ON {} (country_code, class, type)'.format(table))
+ cur.execute(pysql.SQL('CREATE UNIQUE INDEX ON {} (country_code, class, type)')
+ .format(pysql.Identifier(table)))
conn.commit()
_, extension = os.path.splitext(self.csv_path)
if extension != '.csv':
- raise UsageError('The file {} is not a csv file.'.format(self.csv_path))
+ raise UsageError(f'The file {self.csv_path} is not a csv file.')
import logging
import re
-from psycopg2.sql import Identifier, Literal, SQL
+from psycopg2.sql import Identifier, SQL
from nominatim.tools.special_phrases.importer_statistics import SpecialPhrasesImporterStatistics
LOG = logging.getLogger()
"""
table_name = _classtype_table(phrase_class, phrase_type)
with self.db_connection.cursor() as db_cursor:
- db_cursor.execute(SQL("""
- CREATE TABLE IF NOT EXISTS {{}} {}
- AS SELECT place_id AS place_id,st_centroid(geometry) AS centroid FROM placex
- WHERE class = {{}} AND type = {{}}""".format(sql_tablespace))
- .format(Identifier(table_name), Literal(phrase_class),
- Literal(phrase_type)))
+ db_cursor.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
+ SELECT place_id AS place_id,
+ st_centroid(geometry) AS centroid
+ FROM placex
+ WHERE class = %s AND type = %s""")
+ .format(Identifier(table_name), SQL(sql_tablespace)),
+ (phrase_class, phrase_type))
def _create_place_classtype_indexes(self, sql_tablespace, phrase_class, phrase_type):
"""
Create indexes on centroid and place_id for the place_classtype table.
"""
- index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
+ index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
base_table = _classtype_table(phrase_class, phrase_type)
# Index on centroid
if not self.db_connection.index_exists(index_prefix + 'centroid'):
with self.db_connection.cursor() as db_cursor:
- db_cursor.execute(SQL("""
- CREATE INDEX {{}} ON {{}} USING GIST (centroid) {}""".format(sql_tablespace))
- .format(Identifier(index_prefix + 'centroid'),
- Identifier(base_table)), sql_tablespace)
+ db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
+ .format(Identifier(index_prefix + 'centroid'),
+ Identifier(base_table),
+ SQL(sql_tablespace)))
# Index on place_id
if not self.db_connection.index_exists(index_prefix + 'place_id'):
with self.db_connection.cursor() as db_cursor:
- db_cursor.execute(SQL(
- """CREATE INDEX {{}} ON {{}} USING btree(place_id) {}""".format(sql_tablespace))
+ db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
.format(Identifier(index_prefix + 'place_id'),
- Identifier(base_table)))
+ Identifier(base_table),
+ SQL(sql_tablespace)))
def _grant_access_to_webuser(self, phrase_class, phrase_type):
# pylint: disable=consider-using-f-string
-def version_str():
+def version_str(version=NOMINATIM_VERSION):
"""
Return a human-readable string of the version.
"""
- return '{}.{}.{}-{}'.format(*NOMINATIM_VERSION)
+ return '{}.{}.{}-{}'.format(*version)