1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for migration functions
11 import psycopg2.extras
13 from nominatim_db.tools import migration
14 from nominatim_db.errors import UsageError
15 import nominatim_db.version
17 from mock_legacy_word_table import MockLegacyWordTable
21 def update_sql_functions(self, config):
26 def postprocess_mock(monkeypatch):
27 monkeypatch.setattr(migration.refresh, 'create_functions', lambda *args: args)
28 monkeypatch.setattr(migration.tokenizer_factory, 'get_tokenizer_for_db',
29 lambda *args: DummyTokenizer())
32 def legacy_word_table(temp_db_conn):
33 return MockLegacyWordTable(temp_db_conn)
36 def test_no_migration_old_versions(temp_db_with_extensions, table_factory, def_config):
37 table_factory('country_name', 'name HSTORE, country_code TEXT')
39 with pytest.raises(UsageError, match='Migration not possible'):
40 migration.migrate(def_config, {})
43 def test_set_up_migration_for_36(temp_db_with_extensions, temp_db_cursor,
44 table_factory, def_config, monkeypatch,
46 psycopg2.extras.register_hstore(temp_db_cursor)
47 # don't actually run any migration, except the property table creation
48 monkeypatch.setattr(migration, '_MIGRATION_FUNCTIONS',
49 [((3, 5, 0, 99), migration.add_nominatim_property_table)])
50 # Use a r/o user name that always exists
51 monkeypatch.setenv('NOMINATIM_DATABASE_WEBUSER', 'postgres')
53 table_factory('country_name', 'name HSTORE, country_code TEXT',
54 (({str(x): 'a' for x in range(200)}, 'gb'),))
56 assert not temp_db_cursor.table_exists('nominatim_properties')
58 assert migration.migrate(def_config, {}) == 0
60 assert temp_db_cursor.table_exists('nominatim_properties')
62 assert 1 == temp_db_cursor.scalar(""" SELECT count(*) FROM nominatim_properties
63 WHERE property = 'database_version'""")
66 def test_already_at_version(def_config, property_table):
68 property_table.set('database_version',
69 str(nominatim_db.version.NOMINATIM_VERSION))
71 assert migration.migrate(def_config, {}) == 0
74 def test_run_single_migration(def_config, temp_db_cursor, property_table,
75 monkeypatch, postprocess_mock):
76 oldversion = [x for x in nominatim_db.version.NOMINATIM_VERSION]
78 property_table.set('database_version',
79 str(nominatim_db.version.NominatimVersion(*oldversion)))
81 done = {'old': False, 'new': False}
83 """ Dummy migration"""
86 def _old_migration(**_):
87 """ Dummy migration"""
91 monkeypatch.setattr(migration, '_MIGRATION_FUNCTIONS',
92 [(tuple(oldversion), _old_migration),
93 (nominatim_db.version.NOMINATIM_VERSION, _migration)])
95 assert migration.migrate(def_config, {}) == 0
98 assert not done['old']
99 assert property_table.get('database_version') == str(nominatim_db.version.NOMINATIM_VERSION)
102 ###### Tests for specific migrations
104 # Each migration should come with two tests:
105 # 1. Test that migration from old to new state works as expected.
106 # 2. Test that the migration can be rerun on the new state without side effects.
109 @pytest.mark.parametrize('in_attr', ('', 'with time zone'))
110 def test_import_status_timestamp_change(temp_db_conn, temp_db_cursor,
111 table_factory, in_attr):
112 table_factory('import_status',
113 f"""lastimportdate timestamp {in_attr},
117 migration.import_status_timestamp_change(temp_db_conn)
118 temp_db_conn.commit()
120 assert temp_db_cursor.scalar("""SELECT data_type FROM information_schema.columns
121 WHERE table_name = 'import_status'
122 and column_name = 'lastimportdate'""")\
123 == 'timestamp with time zone'
126 def test_add_nominatim_property_table(temp_db_conn, temp_db_cursor,
127 def_config, monkeypatch):
128 # Use a r/o user name that always exists
129 monkeypatch.setenv('NOMINATIM_DATABASE_WEBUSER', 'postgres')
131 assert not temp_db_cursor.table_exists('nominatim_properties')
133 migration.add_nominatim_property_table(temp_db_conn, def_config)
134 temp_db_conn.commit()
136 assert temp_db_cursor.table_exists('nominatim_properties')
139 def test_add_nominatim_property_table_repeat(temp_db_conn, temp_db_cursor,
140 def_config, property_table):
141 assert temp_db_cursor.table_exists('nominatim_properties')
143 migration.add_nominatim_property_table(temp_db_conn, def_config)
144 temp_db_conn.commit()
146 assert temp_db_cursor.table_exists('nominatim_properties')
149 def test_change_housenumber_transliteration(temp_db_conn, temp_db_cursor,
150 legacy_word_table, placex_table):
151 placex_table.add(housenumber='3A')
153 temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION make_standard_name(name TEXT)
154 RETURNS TEXT AS $$ SELECT lower(name) $$ LANGUAGE SQL """)
155 temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION getorcreate_housenumber_id(lookup_word TEXT)
156 RETURNS INTEGER AS $$ SELECT 4325 $$ LANGUAGE SQL """)
158 migration.change_housenumber_transliteration(temp_db_conn)
159 temp_db_conn.commit()
161 assert temp_db_cursor.scalar('SELECT housenumber from placex') == '3a'
163 migration.change_housenumber_transliteration(temp_db_conn)
164 temp_db_conn.commit()
166 assert temp_db_cursor.scalar('SELECT housenumber from placex') == '3a'
169 def test_switch_placenode_geometry_index(temp_db_conn, temp_db_cursor, placex_table):
170 temp_db_cursor.execute("""CREATE INDEX idx_placex_adminname
171 ON placex (place_id)""")
173 migration.switch_placenode_geometry_index(temp_db_conn)
174 temp_db_conn.commit()
176 assert temp_db_cursor.index_exists('placex', 'idx_placex_geometry_placenode')
177 assert not temp_db_cursor.index_exists('placex', 'idx_placex_adminname')
180 def test_switch_placenode_geometry_index_repeat(temp_db_conn, temp_db_cursor, placex_table):
181 temp_db_cursor.execute("""CREATE INDEX idx_placex_geometry_placenode
182 ON placex (place_id)""")
184 migration.switch_placenode_geometry_index(temp_db_conn)
185 temp_db_conn.commit()
187 assert temp_db_cursor.index_exists('placex', 'idx_placex_geometry_placenode')
188 assert not temp_db_cursor.index_exists('placex', 'idx_placex_adminname')
189 assert temp_db_cursor.scalar("""SELECT indexdef from pg_indexes
190 WHERE tablename = 'placex'
191 and indexname = 'idx_placex_geometry_placenode'
192 """).endswith('(place_id)')
195 def test_install_legacy_tokenizer(temp_db_conn, temp_db_cursor, project_env,
196 property_table, table_factory, monkeypatch,
198 table_factory('placex', 'place_id BIGINT')
199 table_factory('location_property_osmline', 'place_id BIGINT')
201 # Setting up the tokenizer is problematic
203 def migrate_database(self, config):
206 monkeypatch.setattr(migration.tokenizer_factory, 'create_tokenizer',
207 lambda cfg, **kwargs: MiniTokenizer())
209 migration.install_legacy_tokenizer(temp_db_conn, project_env)
210 temp_db_conn.commit()
214 def test_install_legacy_tokenizer_repeat(temp_db_conn, temp_db_cursor,
215 def_config, property_table):
217 property_table.set('tokenizer', 'dummy')
218 migration.install_legacy_tokenizer(temp_db_conn, def_config)
219 temp_db_conn.commit()
222 def test_create_tiger_housenumber_index(temp_db_conn, temp_db_cursor, table_factory):
223 table_factory('location_property_tiger',
224 'parent_place_id BIGINT, startnumber INT, endnumber INT')
226 migration.create_tiger_housenumber_index(temp_db_conn)
227 temp_db_conn.commit()
229 if temp_db_conn.server_version_tuple() >= (11, 0, 0):
230 assert temp_db_cursor.index_exists('location_property_tiger',
231 'idx_location_property_tiger_housenumber_migrated')
233 migration.create_tiger_housenumber_index(temp_db_conn)
234 temp_db_conn.commit()