lambda *args: DummyTokenizer())
-def test_no_migration_old_versions(temp_db_with_extensions, table_factory, def_config):
- table_factory('country_name', 'name HSTORE, country_code TEXT')
+def test_no_migration_old_versions(temp_db_with_extensions, def_config, property_table):
+ property_table.set('database_version', '4.2.99-0')
with pytest.raises(UsageError, match='Migration not possible'):
migration.migrate(def_config, {})
-def test_set_up_migration_for_36(temp_db_with_extensions, temp_db_cursor,
- table_factory, def_config, monkeypatch,
- postprocess_mock):
- # don't actually run any migration, except the property table creation
- monkeypatch.setattr(migration, '_MIGRATION_FUNCTIONS',
- [((3, 5, 0, 99), migration.add_nominatim_property_table)])
- # Use a r/o user name that always exists
- monkeypatch.setenv('NOMINATIM_DATABASE_WEBUSER', 'postgres')
-
- table_factory('country_name', 'name HSTORE, country_code TEXT',
- (({str(x): 'a' for x in range(200)}, 'gb'),))
-
- assert not temp_db_cursor.table_exists('nominatim_properties')
-
- assert migration.migrate(def_config, {}) == 0
-
- assert temp_db_cursor.table_exists('nominatim_properties')
-
- assert 1 == temp_db_cursor.scalar(""" SELECT count(*) FROM nominatim_properties
- WHERE property = 'database_version'""")
-
-
def test_already_at_version(temp_db_with_extensions, def_config, property_table):
property_table.set('database_version',
def test_run_single_migration(temp_db_with_extensions, def_config, temp_db_cursor,
property_table, monkeypatch, postprocess_mock):
- oldversion = [x for x in nominatim_db.version.NOMINATIM_VERSION]
- oldversion[0] -= 1
+ oldversion = [4, 4, 99, 0]
property_table.set('database_version',
str(nominatim_db.version.NominatimVersion(*oldversion)))
""" Dummy migration"""
done['old'] = True
- oldversion[0] = 0
+ oldversion[1] = 0
monkeypatch.setattr(migration, '_MIGRATION_FUNCTIONS',
[(tuple(oldversion), _old_migration),
(nominatim_db.version.NOMINATIM_VERSION, _migration)])
# Each migration should come with two tests:
# 1. Test that migration from old to new state works as expected.
# 2. Test that the migration can be rerun on the new state without side effects.
-
-
-@pytest.mark.parametrize('in_attr', ('', 'with time zone'))
-def test_import_status_timestamp_change(temp_db_conn, temp_db_cursor,
- table_factory, in_attr):
- table_factory('import_status',
- f"""lastimportdate timestamp {in_attr},
- sequence_id integer,
- indexed boolean""")
-
- migration.import_status_timestamp_change(temp_db_conn)
- temp_db_conn.commit()
-
- assert temp_db_cursor.scalar("""SELECT data_type FROM information_schema.columns
- WHERE table_name = 'import_status'
- and column_name = 'lastimportdate'""")\
- == 'timestamp with time zone'
-
-
-def test_add_nominatim_property_table(temp_db_conn, temp_db_cursor,
- def_config, monkeypatch):
- # Use a r/o user name that always exists
- monkeypatch.setenv('NOMINATIM_DATABASE_WEBUSER', 'postgres')
-
- assert not temp_db_cursor.table_exists('nominatim_properties')
-
- migration.add_nominatim_property_table(temp_db_conn, def_config)
- temp_db_conn.commit()
-
- assert temp_db_cursor.table_exists('nominatim_properties')
-
-
-def test_add_nominatim_property_table_repeat(temp_db_conn, temp_db_cursor,
- def_config, property_table):
- assert temp_db_cursor.table_exists('nominatim_properties')
-
- migration.add_nominatim_property_table(temp_db_conn, def_config)
- temp_db_conn.commit()
-
- assert temp_db_cursor.table_exists('nominatim_properties')
-
-
-def test_switch_placenode_geometry_index(temp_db_conn, temp_db_cursor, placex_table):
- temp_db_cursor.execute("""CREATE INDEX idx_placex_adminname
- ON placex (place_id)""")
-
- migration.switch_placenode_geometry_index(temp_db_conn)
- temp_db_conn.commit()
-
- assert temp_db_cursor.index_exists('placex', 'idx_placex_geometry_placenode')
- assert not temp_db_cursor.index_exists('placex', 'idx_placex_adminname')
-
-
-def test_switch_placenode_geometry_index_repeat(temp_db_conn, temp_db_cursor, placex_table):
- temp_db_cursor.execute("""CREATE INDEX idx_placex_geometry_placenode
- ON placex (place_id)""")
-
- migration.switch_placenode_geometry_index(temp_db_conn)
- temp_db_conn.commit()
-
- assert temp_db_cursor.index_exists('placex', 'idx_placex_geometry_placenode')
- assert not temp_db_cursor.index_exists('placex', 'idx_placex_adminname')
- assert temp_db_cursor.scalar("""SELECT indexdef from pg_indexes
- WHERE tablename = 'placex'
- and indexname = 'idx_placex_geometry_placenode'
- """).endswith('(place_id)')
-
-
-def test_install_legacy_tokenizer(temp_db_conn, temp_db_cursor, project_env,
- property_table, table_factory, monkeypatch,
- tmp_path):
- table_factory('placex', 'place_id BIGINT')
- table_factory('location_property_osmline', 'place_id BIGINT')
-
- # Setting up the tokenizer is problematic
- class MiniTokenizer:
- def migrate_database(self, config):
- pass
-
- monkeypatch.setattr(migration.tokenizer_factory, 'create_tokenizer',
- lambda cfg, **kwargs: MiniTokenizer())
-
- migration.install_legacy_tokenizer(temp_db_conn, project_env)
- temp_db_conn.commit()
-
-
-
-def test_install_legacy_tokenizer_repeat(temp_db_conn, temp_db_cursor,
- def_config, property_table):
-
- property_table.set('tokenizer', 'dummy')
- migration.install_legacy_tokenizer(temp_db_conn, def_config)
- temp_db_conn.commit()
-
-
-def test_create_tiger_housenumber_index(temp_db_conn, temp_db_cursor, table_factory):
- table_factory('location_property_tiger',
- 'parent_place_id BIGINT, startnumber INT, endnumber INT')
-
- migration.create_tiger_housenumber_index(temp_db_conn)
- temp_db_conn.commit()
-
- if server_version_tuple(temp_db_conn) >= (11, 0, 0):
- assert temp_db_cursor.index_exists('location_property_tiger',
- 'idx_location_property_tiger_housenumber_migrated')
-
- migration.create_tiger_housenumber_index(temp_db_conn)
- temp_db_conn.commit()