import os
import re
import subprocess
-import sys
import json
from os.path import isfile
-from icu import Transliterator # pylint: disable-msg=no-name-in-module
+from icu import Transliterator
from psycopg2.sql import Identifier, Literal, SQL
from nominatim.tools.exec_utils import get_url
languages = _get_languages(args.config) if not languages else languages
#array for pairs of class/type
- pairs = dict()
+ class_type_pairs = set()
transliterator = Transliterator.createFromRules("special-phrases normalizer",
args.config.TERM_NORMALIZATION)
continue
#add class/type to the pairs dict
- pairs[f'{phrase_class}|{phrase_type}'] = (phrase_class, phrase_type)
+ class_type_pairs.add((phrase_class, phrase_type))
_process_amenity(
db_connection, phrase_label, normalized_label,
phrase_class, phrase_type, phrase_operator
)
- _create_place_classtype_table_and_indexes(db_connection, args.config, pairs)
+ _create_place_classtype_table_and_indexes(db_connection, args.config, class_type_pairs)
db_connection.commit()
LOG.warning('Import done.')
Check sanity of given inputs in case somebody added garbage in the wiki.
If a bad class/type is detected the system will exit with an error.
"""
- try:
- if len(pattern.findall(phrase_class)) < 1 or len(pattern.findall(phrase_type)) < 1:
- sys.exit()
- except SystemExit:
+ if len(pattern.findall(phrase_class)) < 1 or len(pattern.findall(phrase_type)) < 1:
LOG.error("Bad class/type for language %s: %s=%s", lang, phrase_class, phrase_type)
- raise
def _process_amenity(db_connection, phrase_label, normalized_label,
(phrase_label, normalized_label, phrase_class, phrase_type))
-def _create_place_classtype_table_and_indexes(db_connection, config, pairs):
+def _create_place_classtype_table_and_indexes(db_connection, config, class_type_pairs):
"""
Create table place_classtype for each given pair.
Also create indexes on place_id and centroid.
with db_connection.cursor() as db_cursor:
db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
- for _, pair in pairs.items():
+ for pair in class_type_pairs.items():
phrase_class = pair[0]
phrase_type = pair[1]
"""
Create table place_classtype of the given phrase_class/phrase_type if doesn't exit.
"""
+ table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
with db_connection.cursor() as db_cursor:
- db_cursor.execute(SQL(f"""
- CREATE TABLE IF NOT EXISTS {{}} {sql_tablespace}
+ db_cursor.execute(SQL("""
+ CREATE TABLE IF NOT EXISTS {{}} {}
AS SELECT place_id AS place_id,st_centroid(geometry) AS centroid FROM placex
- WHERE class = {{}} AND type = {{}}""")
- .format(Identifier(f'place_classtype_{phrase_class}_{phrase_type}'),
- Literal(phrase_class), Literal(phrase_type)))
+ WHERE class = {{}} AND type = {{}}""".format(sql_tablespace))
+ .format(Identifier(table_name), Literal(phrase_class),
+ Literal(phrase_type)))
def _create_place_classtype_indexes(db_connection, sql_tablespace, phrase_class, phrase_type):
"""
Create indexes on centroid and place_id for the place_classtype table.
"""
+ index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
+ base_table = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
#Index on centroid
- if not db_connection.index_exists(f'idx_place_classtype_{phrase_class}_{phrase_type}_centroid'):
+ if not db_connection.index_exists(index_prefix + 'centroid'):
with db_connection.cursor() as db_cursor:
- db_cursor.execute(SQL(f"""
- CREATE INDEX {{}} ON {{}} USING GIST (centroid) {sql_tablespace}""")
- .format(Identifier(
- f"""idx_place_classtype_{phrase_class}_{phrase_type}_centroid"""),
- Identifier(f'place_classtype_{phrase_class}_{phrase_type}')))
+ db_cursor.execute(SQL("""
+ CREATE INDEX {{}} ON {{}} USING GIST (centroid) {}""".format(sql_tablespace))
+ .format(Identifier(index_prefix + 'centroid'),
+ Identifier(base_table)), sql_tablespace)
#Index on place_id
- if not db_connection.index_exists(f'idx_place_classtype_{phrase_class}_{phrase_type}_place_id'):
+ if not db_connection.index_exists(index_prefix + 'place_id'):
with db_connection.cursor() as db_cursor:
- db_cursor.execute(SQL(f"""
- CREATE INDEX {{}} ON {{}} USING btree(place_id) {sql_tablespace}""")
- .format(Identifier(
- f"""idx_place_classtype_{phrase_class}_{phrase_type}_place_id"""),
- Identifier(f'place_classtype_{phrase_class}_{phrase_type}')))
+ db_cursor.execute(SQL(
+ """CREATE INDEX {{}} ON {{}} USING btree(place_id) {}""".format(sql_tablespace))
+ .format(Identifier(index_prefix + 'place_id'),
+ Identifier(base_table)))
def _grant_access_to_webuser(db_connection, config, phrase_class, phrase_type):
"""
Grant access on read to the table place_classtype for the webuser.
"""
+ table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
with db_connection.cursor() as db_cursor:
db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
- .format(Identifier(f'place_classtype_{phrase_class}_{phrase_type}'),
- Identifier(config.DATABASE_WEBUSER)))
+ .format(Identifier(table_name), Identifier(config.DATABASE_WEBUSER)))
def _convert_php_settings_if_needed(args, file_path):
"""
Convert php settings file of special phrases to json file if it is still in php format.
"""
file, extension = os.path.splitext(file_path)
- json_file_path = f'{file}.json'
+ json_file_path = file + '.json'
if extension == '.php' and not isfile(json_file_path):
try:
subprocess.run(['/usr/bin/env', 'php', '-Cq',
import pytest
from nominatim.tools.special_phrases import _create_place_classtype_indexes, _create_place_classtype_table, _get_wiki_content, _grant_access_to_webuser, _process_amenity
-def test_get_wiki_content():
- assert _get_wiki_content('fr')
+def test_process_amenity_with_operator(temp_db_conn, getorcreate_amenityoperator_funcs):
+ _process_amenity(temp_db_conn, '', '', '', '', 'near')
+ _process_amenity(temp_db_conn, '', '', '', '', 'in')
-def execute_and_verify_add_word(temp_db_conn, phrase_label, normalized_label,
- phrase_class, phrase_type):
- _process_amenity(temp_db_conn, phrase_label, normalized_label,
- phrase_class, phrase_type, '')
-
- with temp_db_conn.cursor() as temp_db_cursor:
- temp_db_cursor.execute(f"""
- SELECT * FROM word
- WHERE word_token=' {normalized_label}'
- AND word='{normalized_label}'
- AND class='{phrase_class}'
- AND type='{phrase_type}'
- AND type='{phrase_type}'""")
- return temp_db_cursor.fetchone()
-
-def execute_and_verify_add_word_with_operator(temp_db_conn, phrase_label, normalized_label,
- phrase_class, phrase_type, phrase_operator):
- _process_amenity(temp_db_conn, phrase_label, normalized_label,
- phrase_class, phrase_type, phrase_operator)
-
- with temp_db_conn.cursor() as temp_db_cursor:
- temp_db_cursor.execute(f"""
- SELECT * FROM word
- WHERE word_token=' {normalized_label}'
- AND word='{normalized_label}'
- AND class='{phrase_class}'
- AND type='{phrase_type}'
- AND operator='{phrase_operator}'""")
- return temp_db_cursor.fetchone()
-
-def test_process_amenity_with_near_operator(temp_db_conn, word_table, amenity_operator_funcs):
- phrase_label = ' label '
- normalized_label = 'label'
- phrase_class = 'class'
- phrase_type = 'type'
-
- assert execute_and_verify_add_word(temp_db_conn, phrase_label, normalized_label,
- phrase_class, phrase_type)
- assert execute_and_verify_add_word_with_operator(temp_db_conn, phrase_label, normalized_label,
- phrase_class, phrase_type, 'near')
- assert execute_and_verify_add_word_with_operator(temp_db_conn, phrase_label, normalized_label,
- phrase_class, phrase_type, 'in')
-
-def index_exists(db_connect, index):
- """ Check that an index with the given name exists in the database.
- """
- with db_connect.cursor() as cur:
- cur.execute("""SELECT tablename FROM pg_indexes
- WHERE indexname = %s and schemaname = 'public'""", (index, ))
- if cur.rowcount == 0:
- return False
- return True
+def test_process_amenity_without_operator(temp_db_conn, getorcreate_amenity_funcs):
+ _process_amenity(temp_db_conn, '', '', '', '', '')
def test_create_place_classtype_indexes(temp_db_conn):
phrase_class = 'class'
phrase_type = 'type'
- table_name = f'place_classtype_{phrase_class}_{phrase_type}'
+ table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
+ index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
with temp_db_conn.cursor() as temp_db_cursor:
temp_db_cursor.execute("CREATE EXTENSION postgis;")
- temp_db_cursor.execute(f'CREATE TABLE {table_name}(place_id BIGINT, centroid GEOMETRY)')
+ temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
_create_place_classtype_indexes(temp_db_conn, '', phrase_class, phrase_type)
- centroid_index_exists = index_exists(temp_db_conn, f'idx_place_classtype_{phrase_class}_{phrase_type}_centroid')
- place_id_index_exists = index_exists(temp_db_conn, f'idx_place_classtype_{phrase_class}_{phrase_type}_place_id')
+ centroid_index_exists = temp_db_conn.index_exists(index_prefix + 'centroid')
+ place_id_index_exists = temp_db_conn.index_exists(index_prefix + 'place_id')
assert centroid_index_exists and place_id_index_exists
def test_grant_access_to_web_user(temp_db_conn, def_config):
phrase_class = 'class'
phrase_type = 'type'
- table_name = f'place_classtype_{phrase_class}_{phrase_type}'
+ table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
with temp_db_conn.cursor() as temp_db_cursor:
- temp_db_cursor.execute(f'CREATE TABLE {table_name}()')
+ temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
_grant_access_to_webuser(temp_db_conn, def_config, phrase_class, phrase_type)
assert result
@pytest.fixture
-def amenity_operator_funcs(temp_db_cursor):
+def make_strandard_name_func(temp_db_cursor):
temp_db_cursor.execute(f"""
- CREATE OR REPLACE FUNCTION make_standard_name(name TEXT) RETURNS TEXT
- AS $$
- DECLARE
- o TEXT;
+ CREATE OR REPLACE FUNCTION make_standard_name(name TEXT) RETURNS TEXT AS $$
BEGIN
RETURN trim(name); --Basically return only the trimed name for the tests
END;
- $$
- LANGUAGE plpgsql IMMUTABLE;
-
- CREATE SEQUENCE seq_word start 1;
-
+ $$ LANGUAGE plpgsql IMMUTABLE;""")
+
+@pytest.fixture
+def getorcreate_amenity_funcs(temp_db_cursor, make_strandard_name_func):
+ temp_db_cursor.execute(f"""
CREATE OR REPLACE FUNCTION getorcreate_amenity(lookup_word TEXT, normalized_word TEXT,
lookup_class text, lookup_type text)
- RETURNS INTEGER
- AS $$
- DECLARE
- lookup_token TEXT;
- return_word_id INTEGER;
- BEGIN
- lookup_token := ' '||trim(lookup_word);
- SELECT min(word_id) FROM word
- WHERE word_token = lookup_token and word = normalized_word
- and class = lookup_class and type = lookup_type
- INTO return_word_id;
- IF return_word_id IS NULL THEN
- return_word_id := nextval('seq_word');
- INSERT INTO word VALUES (return_word_id, lookup_token, normalized_word,
- lookup_class, lookup_type, null, 0);
- END IF;
- RETURN return_word_id;
- END;
- $$
- LANGUAGE plpgsql;
+ RETURNS void as $$
+ BEGIN END;
+ $$ LANGUAGE plpgsql""")
- CREATE OR REPLACE FUNCTION getorcreate_amenityoperator(lookup_word TEXT,
- normalized_word TEXT,
- lookup_class text,
- lookup_type text,
- op text)
- RETURNS INTEGER
- AS $$
- DECLARE
- lookup_token TEXT;
- return_word_id INTEGER;
- BEGIN
- lookup_token := ' '||trim(lookup_word);
- SELECT min(word_id) FROM word
- WHERE word_token = lookup_token and word = normalized_word
- and class = lookup_class and type = lookup_type and operator = op
- INTO return_word_id;
- IF return_word_id IS NULL THEN
- return_word_id := nextval('seq_word');
- INSERT INTO word VALUES (return_word_id, lookup_token, normalized_word,
- lookup_class, lookup_type, null, 0, op);
- END IF;
- RETURN return_word_id;
- END;
- $$
- LANGUAGE plpgsql;""")
+@pytest.fixture
+def getorcreate_amenityoperator_funcs(temp_db_cursor, make_strandard_name_func):
+ temp_db_cursor.execute(f"""
+ CREATE OR REPLACE FUNCTION getorcreate_amenityoperator(lookup_word TEXT, normalized_word TEXT,
+ lookup_class text, lookup_type text, op text)
+ RETURNS void as $$
+ BEGIN END;
+ $$ LANGUAGE plpgsql""")
\ No newline at end of file