]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/python/tools/test_import_special_phrases.py
port code to psycopg3
[nominatim.git] / test / python / tools / test_import_special_phrases.py
index 7c3d0646066b1abb33cfd6d3114edd39a1782679..0d33e6e0f30e4a0ab47bfb02a4cd63f2ad4c55a9 100644 (file)
@@ -1,27 +1,29 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2024 by the Nominatim developer community.
+# For a full list of authors see the git log.
 """
     Tests for import special phrases methods
     of the class SPImporter.
 """
 from shutil import copyfile
 import pytest
 """
     Tests for import special phrases methods
     of the class SPImporter.
 """
 from shutil import copyfile
 import pytest
-from nominatim.tools.special_phrases.sp_importer import SPImporter
-from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
-from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
-from nominatim.errors import UsageError
+from nominatim_db.tools.special_phrases.sp_importer import SPImporter
+from nominatim_db.tools.special_phrases.sp_wiki_loader import SPWikiLoader
+from nominatim_db.tools.special_phrases.special_phrase import SpecialPhrase
+from nominatim_db.errors import UsageError
 
 from cursor import CursorForTesting
 
 @pytest.fixture
 
 from cursor import CursorForTesting
 
 @pytest.fixture
-def testfile_dir(src_dir):
-    return src_dir / 'test' / 'testfiles'
-
-
-@pytest.fixture
-def sp_importer(temp_db_conn, def_config):
+def sp_importer(temp_db_conn, def_config, monkeypatch):
     """
         Return an instance of SPImporter.
     """
     """
         Return an instance of SPImporter.
     """
-    loader = SPWikiLoader(def_config, ['en'])
+    monkeypatch.setenv('NOMINATIM_LANGUAGES', 'en')
+    loader = SPWikiLoader(def_config)
     return SPImporter(def_config, temp_db_conn, loader)
 
 
     return SPImporter(def_config, temp_db_conn, loader)
 
 
@@ -73,7 +75,8 @@ def test_load_white_and_black_lists(sp_importer):
     assert isinstance(black_list, dict) and isinstance(white_list, dict)
 
 
     assert isinstance(black_list, dict) and isinstance(white_list, dict)
 
 
-def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
+def test_create_place_classtype_indexes(temp_db_with_extensions,
+                                        temp_db_conn, temp_db_cursor,
                                         table_factory, sp_importer):
     """
         Test that _create_place_classtype_indexes() create the
                                         table_factory, sp_importer):
     """
         Test that _create_place_classtype_indexes() create the
@@ -86,10 +89,11 @@ def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
 
     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
 
     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
+    temp_db_conn.commit()
 
 
-    assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
+    assert check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type)
 
 
-def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
+def test_create_place_classtype_table(temp_db_conn, temp_db_cursor, placex_table, sp_importer):
     """
         Test that _create_place_classtype_table() create
         the right place_classtype table.
     """
         Test that _create_place_classtype_table() create
         the right place_classtype table.
@@ -97,10 +101,12 @@ def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
     phrase_class = 'class'
     phrase_type = 'type'
     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
     phrase_class = 'class'
     phrase_type = 'type'
     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
+    temp_db_conn.commit()
 
 
-    assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
+    assert check_table_exist(temp_db_cursor, phrase_class, phrase_type)
 
 
-def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
+def test_grant_access_to_web_user(temp_db_conn, temp_db_cursor, table_factory,
+                                  def_config, sp_importer):
     """
         Test that _grant_access_to_webuser() give
         right access to the web user.
     """
         Test that _grant_access_to_webuser() give
         right access to the web user.
@@ -112,12 +118,13 @@ def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_im
     table_factory(table_name)
 
     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
     table_factory(table_name)
 
     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
+    temp_db_conn.commit()
 
 
-    assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
+    assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
 
 def test_create_place_classtype_table_and_indexes(
 
 def test_create_place_classtype_table_and_indexes(
-        temp_db_conn, def_config, placex_table,
-        sp_importer):
+        temp_db_cursor, def_config, placex_table,
+        sp_importer, temp_db_conn):
     """
         Test that _create_place_classtype_table_and_indexes()
         create the right place_classtype tables and place_id indexes
     """
         Test that _create_place_classtype_table_and_indexes()
         create the right place_classtype tables and place_id indexes
@@ -126,15 +133,16 @@ def test_create_place_classtype_table_and_indexes(
     """
     pairs = set([('class1', 'type1'), ('class2', 'type2')])
 
     """
     pairs = set([('class1', 'type1'), ('class2', 'type2')])
 
-    sp_importer._create_place_classtype_table_and_indexes(pairs)
+    sp_importer._create_classtype_table_and_indexes(pairs)
+    temp_db_conn.commit()
 
     for pair in pairs:
 
     for pair in pairs:
-        assert check_table_exist(temp_db_conn, pair[0], pair[1])
-        assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
-        assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
+        assert check_table_exist(temp_db_cursor, pair[0], pair[1])
+        assert check_placeid_and_centroid_indexes(temp_db_cursor, pair[0], pair[1])
+        assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, pair[0], pair[1])
 
 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
 
 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
-                                            temp_db_conn):
+                                            temp_db_conn, temp_db_cursor):
     """
         Check for the remove_non_existent_phrases_from_db() method.
 
     """
         Check for the remove_non_existent_phrases_from_db() method.
 
@@ -157,15 +165,14 @@ def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
     """
 
     sp_importer._remove_non_existent_tables_from_db()
     """
 
     sp_importer._remove_non_existent_tables_from_db()
+    temp_db_conn.commit()
 
 
-    # Changes are not committed yet. Use temp_db_conn for checking results.
-    with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
-        assert cur.row_set(query_tables) \
+    assert temp_db_cursor.row_set(query_tables) \
                  == {('place_classtype_testclasstypetable_to_keep', )}
 
 
 @pytest.mark.parametrize("should_replace", [(True), (False)])
                  == {('place_classtype_testclasstypetable_to_keep', )}
 
 
 @pytest.mark.parametrize("should_replace", [(True), (False)])
-def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
+def test_import_phrases(monkeypatch, temp_db_cursor, def_config, sp_importer,
                         placex_table, table_factory, tokenizer_mock,
                         xml_wiki_content, should_replace):
     """
                         placex_table, table_factory, tokenizer_mock,
                         xml_wiki_content, should_replace):
     """
@@ -180,8 +187,8 @@ def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
     table_factory('place_classtype_amenity_animal_shelter')
     table_factory('place_classtype_wrongclass_wrongtype')
 
     table_factory('place_classtype_amenity_animal_shelter')
     table_factory('place_classtype_wrongclass_wrongtype')
 
-    monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
-                        lambda self, lang: xml_wiki_content)
+    monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content',
+                        lambda lang: xml_wiki_content)
 
     tokenizer = tokenizer_mock()
     sp_importer.import_phrases(tokenizer, should_replace)
 
     tokenizer = tokenizer_mock()
     sp_importer.import_phrases(tokenizer, should_replace)
@@ -191,49 +198,49 @@ def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
     class_test = 'aerialway'
     type_test = 'zip_line'
 
     class_test = 'aerialway'
     type_test = 'zip_line'
 
-    assert check_table_exist(temp_db_conn, class_test, type_test)
-    assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
-    assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
-    assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
+    assert check_table_exist(temp_db_cursor, class_test, type_test)
+    assert check_placeid_and_centroid_indexes(temp_db_cursor, class_test, type_test)
+    assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, class_test, type_test)
+    assert check_table_exist(temp_db_cursor, 'amenity', 'animal_shelter')
     if should_replace:
     if should_replace:
-        assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
+        assert not check_table_exist(temp_db_cursor, 'wrong_class', 'wrong_type')
 
 
-    assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
+    assert temp_db_cursor.table_exists('place_classtype_amenity_animal_shelter')
     if should_replace:
     if should_replace:
-        assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
+        assert not temp_db_cursor.table_exists('place_classtype_wrongclass_wrongtype')
 
 
-def check_table_exist(temp_db_conn, phrase_class, phrase_type):
+def check_table_exist(temp_db_cursor, phrase_class, phrase_type):
     """
         Verify that the place_classtype table exists for the given
         phrase_class and phrase_type.
     """
     """
         Verify that the place_classtype table exists for the given
         phrase_class and phrase_type.
     """
-    return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
+    return temp_db_cursor.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
 
 
 
 
-def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
+def check_grant_access(temp_db_cursor, user, phrase_class, phrase_type):
     """
         Check that the web user has been granted right access to the
         place_classtype table of the given phrase_class and phrase_type.
     """
     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
 
     """
         Check that the web user has been granted right access to the
         place_classtype table of the given phrase_class and phrase_type.
     """
     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
 
-    with temp_db_conn.cursor() as temp_db_cursor:
-        temp_db_cursor.execute("""
-                SELECT * FROM information_schema.role_table_grants
-                WHERE table_name='{}'
-                AND grantee='{}'
-                AND privilege_type='SELECT'""".format(table_name, user))
-        return temp_db_cursor.fetchone()
+    temp_db_cursor.execute("""
+            SELECT * FROM information_schema.role_table_grants
+            WHERE table_name='{}'
+            AND grantee='{}'
+            AND privilege_type='SELECT'""".format(table_name, user))
+    return temp_db_cursor.fetchone()
 
 
-def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
+def check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type):
     """
         Check that the place_id index and centroid index exist for the
         place_classtype table of the given phrase_class and phrase_type.
     """
     """
         Check that the place_id index and centroid index exist for the
         place_classtype table of the given phrase_class and phrase_type.
     """
+    table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
 
     return (
     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
 
     return (
-        temp_db_conn.index_exists(index_prefix + 'centroid')
+        temp_db_cursor.index_exists(table_name, index_prefix + 'centroid')
         and
         and
-        temp_db_conn.index_exists(index_prefix + 'place_id')
+        temp_db_cursor.index_exists(table_name, index_prefix + 'place_id')
     )
     )