1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for migration functions
12 from nominatim_db.tools import migration
13 from nominatim_db.errors import UsageError
14 from nominatim_db.db.connection import server_version_tuple
15 import nominatim_db.version
19 def update_sql_functions(self, config):
24 def postprocess_mock(monkeypatch):
25 monkeypatch.setattr(migration.refresh, 'create_functions', lambda *args: args)
26 monkeypatch.setattr(migration.tokenizer_factory, 'get_tokenizer_for_db',
27 lambda *args: DummyTokenizer())
30 def test_no_migration_old_versions(temp_db_with_extensions, table_factory, def_config):
31 table_factory('country_name', 'name HSTORE, country_code TEXT')
33 with pytest.raises(UsageError, match='Migration not possible'):
34 migration.migrate(def_config, {})
37 def test_set_up_migration_for_36(temp_db_with_extensions, temp_db_cursor,
38 table_factory, def_config, monkeypatch,
40 # don't actually run any migration, except the property table creation
41 monkeypatch.setattr(migration, '_MIGRATION_FUNCTIONS',
42 [((3, 5, 0, 99), migration.add_nominatim_property_table)])
43 # Use a r/o user name that always exists
44 monkeypatch.setenv('NOMINATIM_DATABASE_WEBUSER', 'postgres')
46 table_factory('country_name', 'name HSTORE, country_code TEXT',
47 (({str(x): 'a' for x in range(200)}, 'gb'),))
49 assert not temp_db_cursor.table_exists('nominatim_properties')
51 assert migration.migrate(def_config, {}) == 0
53 assert temp_db_cursor.table_exists('nominatim_properties')
55 assert 1 == temp_db_cursor.scalar(""" SELECT count(*) FROM nominatim_properties
56 WHERE property = 'database_version'""")
59 def test_already_at_version(temp_db_with_extensions, def_config, property_table):
61 property_table.set('database_version',
62 str(nominatim_db.version.NOMINATIM_VERSION))
64 assert migration.migrate(def_config, {}) == 0
67 def test_run_single_migration(temp_db_with_extensions, def_config, temp_db_cursor,
68 property_table, monkeypatch, postprocess_mock):
69 oldversion = [x for x in nominatim_db.version.NOMINATIM_VERSION]
71 property_table.set('database_version',
72 str(nominatim_db.version.NominatimVersion(*oldversion)))
74 done = {'old': False, 'new': False}
76 """ Dummy migration"""
79 def _old_migration(**_):
80 """ Dummy migration"""
84 monkeypatch.setattr(migration, '_MIGRATION_FUNCTIONS',
85 [(tuple(oldversion), _old_migration),
86 (nominatim_db.version.NOMINATIM_VERSION, _migration)])
88 assert migration.migrate(def_config, {}) == 0
91 assert not done['old']
92 assert property_table.get('database_version') == str(nominatim_db.version.NOMINATIM_VERSION)
95 ###### Tests for specific migrations
97 # Each migration should come with two tests:
98 # 1. Test that migration from old to new state works as expected.
99 # 2. Test that the migration can be rerun on the new state without side effects.
102 @pytest.mark.parametrize('in_attr', ('', 'with time zone'))
103 def test_import_status_timestamp_change(temp_db_conn, temp_db_cursor,
104 table_factory, in_attr):
105 table_factory('import_status',
106 f"""lastimportdate timestamp {in_attr},
110 migration.import_status_timestamp_change(temp_db_conn)
111 temp_db_conn.commit()
113 assert temp_db_cursor.scalar("""SELECT data_type FROM information_schema.columns
114 WHERE table_name = 'import_status'
115 and column_name = 'lastimportdate'""")\
116 == 'timestamp with time zone'
119 def test_add_nominatim_property_table(temp_db_conn, temp_db_cursor,
120 def_config, monkeypatch):
121 # Use a r/o user name that always exists
122 monkeypatch.setenv('NOMINATIM_DATABASE_WEBUSER', 'postgres')
124 assert not temp_db_cursor.table_exists('nominatim_properties')
126 migration.add_nominatim_property_table(temp_db_conn, def_config)
127 temp_db_conn.commit()
129 assert temp_db_cursor.table_exists('nominatim_properties')
132 def test_add_nominatim_property_table_repeat(temp_db_conn, temp_db_cursor,
133 def_config, property_table):
134 assert temp_db_cursor.table_exists('nominatim_properties')
136 migration.add_nominatim_property_table(temp_db_conn, def_config)
137 temp_db_conn.commit()
139 assert temp_db_cursor.table_exists('nominatim_properties')
142 def test_switch_placenode_geometry_index(temp_db_conn, temp_db_cursor, placex_table):
143 temp_db_cursor.execute("""CREATE INDEX idx_placex_adminname
144 ON placex (place_id)""")
146 migration.switch_placenode_geometry_index(temp_db_conn)
147 temp_db_conn.commit()
149 assert temp_db_cursor.index_exists('placex', 'idx_placex_geometry_placenode')
150 assert not temp_db_cursor.index_exists('placex', 'idx_placex_adminname')
153 def test_switch_placenode_geometry_index_repeat(temp_db_conn, temp_db_cursor, placex_table):
154 temp_db_cursor.execute("""CREATE INDEX idx_placex_geometry_placenode
155 ON placex (place_id)""")
157 migration.switch_placenode_geometry_index(temp_db_conn)
158 temp_db_conn.commit()
160 assert temp_db_cursor.index_exists('placex', 'idx_placex_geometry_placenode')
161 assert not temp_db_cursor.index_exists('placex', 'idx_placex_adminname')
162 assert temp_db_cursor.scalar("""SELECT indexdef from pg_indexes
163 WHERE tablename = 'placex'
164 and indexname = 'idx_placex_geometry_placenode'
165 """).endswith('(place_id)')
168 def test_install_legacy_tokenizer(temp_db_conn, temp_db_cursor, project_env,
169 property_table, table_factory, monkeypatch,
171 table_factory('placex', 'place_id BIGINT')
172 table_factory('location_property_osmline', 'place_id BIGINT')
174 # Setting up the tokenizer is problematic
176 def migrate_database(self, config):
179 monkeypatch.setattr(migration.tokenizer_factory, 'create_tokenizer',
180 lambda cfg, **kwargs: MiniTokenizer())
182 migration.install_legacy_tokenizer(temp_db_conn, project_env)
183 temp_db_conn.commit()
187 def test_install_legacy_tokenizer_repeat(temp_db_conn, temp_db_cursor,
188 def_config, property_table):
190 property_table.set('tokenizer', 'dummy')
191 migration.install_legacy_tokenizer(temp_db_conn, def_config)
192 temp_db_conn.commit()
195 def test_create_tiger_housenumber_index(temp_db_conn, temp_db_cursor, table_factory):
196 table_factory('location_property_tiger',
197 'parent_place_id BIGINT, startnumber INT, endnumber INT')
199 migration.create_tiger_housenumber_index(temp_db_conn)
200 temp_db_conn.commit()
202 if server_version_tuple(temp_db_conn) >= (11, 0, 0):
203 assert temp_db_cursor.index_exists('location_property_tiger',
204 'idx_location_property_tiger_housenumber_migrated')
206 migration.create_tiger_housenumber_index(temp_db_conn)
207 temp_db_conn.commit()