]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_migration.py
port code to psycopg3
[nominatim.git] / test / python / tools / test_migration.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for migration functions
9 """
10 import pytest
11
12 from nominatim_db.tools import migration
13 from nominatim_db.errors import UsageError
14 from nominatim_db.db.connection import server_version_tuple
15 import nominatim_db.version
16
17 from mock_legacy_word_table import MockLegacyWordTable
18
19 class DummyTokenizer:
20
21     def update_sql_functions(self, config):
22         pass
23
24
25 @pytest.fixture
26 def postprocess_mock(monkeypatch):
27     monkeypatch.setattr(migration.refresh, 'create_functions', lambda *args: args)
28     monkeypatch.setattr(migration.tokenizer_factory, 'get_tokenizer_for_db',
29                         lambda *args: DummyTokenizer())
30
31 @pytest.fixture
32 def legacy_word_table(temp_db_conn):
33     return MockLegacyWordTable(temp_db_conn)
34
35
36 def test_no_migration_old_versions(temp_db_with_extensions, table_factory, def_config):
37     table_factory('country_name', 'name HSTORE, country_code TEXT')
38
39     with pytest.raises(UsageError, match='Migration not possible'):
40         migration.migrate(def_config, {})
41
42
43 def test_set_up_migration_for_36(temp_db_with_extensions, temp_db_cursor,
44                                  table_factory, def_config, monkeypatch,
45                                  postprocess_mock):
46     # don't actually run any migration, except the property table creation
47     monkeypatch.setattr(migration, '_MIGRATION_FUNCTIONS',
48                         [((3, 5, 0, 99), migration.add_nominatim_property_table)])
49     # Use a r/o user name that always exists
50     monkeypatch.setenv('NOMINATIM_DATABASE_WEBUSER', 'postgres')
51
52     table_factory('country_name', 'name HSTORE, country_code TEXT',
53                   (({str(x): 'a' for x in range(200)}, 'gb'),))
54
55     assert not temp_db_cursor.table_exists('nominatim_properties')
56
57     assert migration.migrate(def_config, {}) == 0
58
59     assert temp_db_cursor.table_exists('nominatim_properties')
60
61     assert 1 == temp_db_cursor.scalar(""" SELECT count(*) FROM nominatim_properties
62                                           WHERE property = 'database_version'""")
63
64
65 def test_already_at_version(temp_db_with_extensions, def_config, property_table):
66
67     property_table.set('database_version',
68                        str(nominatim_db.version.NOMINATIM_VERSION))
69
70     assert migration.migrate(def_config, {}) == 0
71
72
73 def test_run_single_migration(temp_db_with_extensions, def_config, temp_db_cursor,
74                               property_table, monkeypatch, postprocess_mock):
75     oldversion = [x for x in nominatim_db.version.NOMINATIM_VERSION]
76     oldversion[0] -= 1
77     property_table.set('database_version',
78                        str(nominatim_db.version.NominatimVersion(*oldversion)))
79
80     done = {'old': False, 'new': False}
81     def _migration(**_):
82         """ Dummy migration"""
83         done['new'] = True
84
85     def _old_migration(**_):
86         """ Dummy migration"""
87         done['old'] = True
88
89     oldversion[0] = 0
90     monkeypatch.setattr(migration, '_MIGRATION_FUNCTIONS',
91                         [(tuple(oldversion), _old_migration),
92                          (nominatim_db.version.NOMINATIM_VERSION, _migration)])
93
94     assert migration.migrate(def_config, {}) == 0
95
96     assert done['new']
97     assert not done['old']
98     assert property_table.get('database_version') == str(nominatim_db.version.NOMINATIM_VERSION)
99
100
101 ###### Tests for specific migrations
102 #
103 # Each migration should come with two tests:
104 #  1. Test that migration from old to new state works as expected.
105 #  2. Test that the migration can be rerun on the new state without side effects.
106
107
108 @pytest.mark.parametrize('in_attr', ('', 'with time zone'))
109 def test_import_status_timestamp_change(temp_db_conn, temp_db_cursor,
110                                         table_factory, in_attr):
111     table_factory('import_status',
112                   f"""lastimportdate timestamp {in_attr},
113                      sequence_id integer,
114                      indexed boolean""")
115
116     migration.import_status_timestamp_change(temp_db_conn)
117     temp_db_conn.commit()
118
119     assert temp_db_cursor.scalar("""SELECT data_type FROM information_schema.columns
120                                     WHERE table_name = 'import_status'
121                                       and column_name = 'lastimportdate'""")\
122             == 'timestamp with time zone'
123
124
125 def test_add_nominatim_property_table(temp_db_conn, temp_db_cursor,
126                                       def_config, monkeypatch):
127     # Use a r/o user name that always exists
128     monkeypatch.setenv('NOMINATIM_DATABASE_WEBUSER', 'postgres')
129
130     assert not temp_db_cursor.table_exists('nominatim_properties')
131
132     migration.add_nominatim_property_table(temp_db_conn, def_config)
133     temp_db_conn.commit()
134
135     assert temp_db_cursor.table_exists('nominatim_properties')
136
137
138 def test_add_nominatim_property_table_repeat(temp_db_conn, temp_db_cursor,
139                                              def_config, property_table):
140     assert temp_db_cursor.table_exists('nominatim_properties')
141
142     migration.add_nominatim_property_table(temp_db_conn, def_config)
143     temp_db_conn.commit()
144
145     assert temp_db_cursor.table_exists('nominatim_properties')
146
147
148 def test_change_housenumber_transliteration(temp_db_conn, temp_db_cursor,
149                                             legacy_word_table, placex_table):
150     placex_table.add(housenumber='3A')
151
152     temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION make_standard_name(name TEXT)
153                               RETURNS TEXT AS $$ SELECT lower(name) $$ LANGUAGE SQL """)
154     temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION getorcreate_housenumber_id(lookup_word TEXT)
155                               RETURNS INTEGER AS $$ SELECT 4325 $$ LANGUAGE SQL """)
156
157     migration.change_housenumber_transliteration(temp_db_conn)
158     temp_db_conn.commit()
159
160     assert temp_db_cursor.scalar('SELECT housenumber from placex') == '3a'
161
162     migration.change_housenumber_transliteration(temp_db_conn)
163     temp_db_conn.commit()
164
165     assert temp_db_cursor.scalar('SELECT housenumber from placex') == '3a'
166
167
168 def test_switch_placenode_geometry_index(temp_db_conn, temp_db_cursor, placex_table):
169     temp_db_cursor.execute("""CREATE INDEX idx_placex_adminname
170                               ON placex (place_id)""")
171
172     migration.switch_placenode_geometry_index(temp_db_conn)
173     temp_db_conn.commit()
174
175     assert temp_db_cursor.index_exists('placex', 'idx_placex_geometry_placenode')
176     assert not temp_db_cursor.index_exists('placex', 'idx_placex_adminname')
177
178
179 def test_switch_placenode_geometry_index_repeat(temp_db_conn, temp_db_cursor, placex_table):
180     temp_db_cursor.execute("""CREATE INDEX idx_placex_geometry_placenode
181                               ON placex (place_id)""")
182
183     migration.switch_placenode_geometry_index(temp_db_conn)
184     temp_db_conn.commit()
185
186     assert temp_db_cursor.index_exists('placex', 'idx_placex_geometry_placenode')
187     assert not temp_db_cursor.index_exists('placex', 'idx_placex_adminname')
188     assert temp_db_cursor.scalar("""SELECT indexdef from pg_indexes
189                                     WHERE tablename = 'placex'
190                                       and indexname = 'idx_placex_geometry_placenode'
191                                  """).endswith('(place_id)')
192
193
194 def test_install_legacy_tokenizer(temp_db_conn, temp_db_cursor, project_env,
195                                   property_table, table_factory, monkeypatch,
196                                   tmp_path):
197     table_factory('placex', 'place_id BIGINT')
198     table_factory('location_property_osmline', 'place_id BIGINT')
199
200     # Setting up the tokenizer is problematic
201     class MiniTokenizer:
202         def migrate_database(self, config):
203             pass
204
205     monkeypatch.setattr(migration.tokenizer_factory, 'create_tokenizer',
206                         lambda cfg, **kwargs: MiniTokenizer())
207
208     migration.install_legacy_tokenizer(temp_db_conn, project_env)
209     temp_db_conn.commit()
210
211
212
213 def test_install_legacy_tokenizer_repeat(temp_db_conn, temp_db_cursor,
214                                          def_config, property_table):
215
216     property_table.set('tokenizer', 'dummy')
217     migration.install_legacy_tokenizer(temp_db_conn, def_config)
218     temp_db_conn.commit()
219
220
221 def test_create_tiger_housenumber_index(temp_db_conn, temp_db_cursor, table_factory):
222     table_factory('location_property_tiger',
223                   'parent_place_id BIGINT, startnumber INT, endnumber INT')
224
225     migration.create_tiger_housenumber_index(temp_db_conn)
226     temp_db_conn.commit()
227
228     if server_version_tuple(temp_db_conn) >= (11, 0, 0):
229         assert temp_db_cursor.index_exists('location_property_tiger',
230                                            'idx_location_property_tiger_housenumber_migrated')
231
232     migration.create_tiger_housenumber_index(temp_db_conn)
233     temp_db_conn.commit()