]> git.openstreetmap.org Git - nominatim.git/blob - test/python/db/test_sql_preprocessor.py
port code to psycopg3
[nominatim.git] / test / python / db / test_sql_preprocessor.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for SQL preprocessing.
9 """
10 import pytest
11 import pytest_asyncio
12
13 from nominatim_db.db.sql_preprocessor import SQLPreprocessor
14
15 @pytest.fixture
16 def sql_factory(tmp_path):
17     def _mk_sql(sql_body):
18         (tmp_path / 'test.sql').write_text("""
19           CREATE OR REPLACE FUNCTION test() RETURNS TEXT
20           AS $$
21           BEGIN
22             {}
23           END;
24           $$ LANGUAGE plpgsql IMMUTABLE;""".format(sql_body))
25         return 'test.sql'
26
27     return _mk_sql
28
29 @pytest.mark.parametrize("expr,ret", [
30     ("'a'", 'a'),
31     ("'{{db.partitions|join}}'", '012'),
32     ("{% if 'country_name' in db.tables %}'yes'{% else %}'no'{% endif %}", "yes"),
33     ("{% if 'xxx' in db.tables %}'yes'{% else %}'no'{% endif %}", "no"),
34     ("'{{db.tablespace.address_data}}'", ""),
35     ("'{{db.tablespace.search_data}}'", 'TABLESPACE "dsearch"'),
36     ("'{{db.tablespace.address_index}}'", 'TABLESPACE "iaddress"'),
37     ("'{{db.tablespace.aux_data}}'", 'TABLESPACE "daux"')
38     ])
39 def test_load_file_simple(sql_preprocessor_cfg, sql_factory,
40                           temp_db_conn, temp_db_cursor, monkeypatch,
41                           expr, ret):
42     monkeypatch.setenv('NOMINATIM_TABLESPACE_SEARCH_DATA', 'dsearch')
43     monkeypatch.setenv('NOMINATIM_TABLESPACE_ADDRESS_INDEX', 'iaddress')
44     monkeypatch.setenv('NOMINATIM_TABLESPACE_AUX_DATA', 'daux')
45     sqlfile = sql_factory("RETURN {};".format(expr))
46
47     SQLPreprocessor(temp_db_conn, sql_preprocessor_cfg).run_sql_file(temp_db_conn, sqlfile)
48
49     assert temp_db_cursor.scalar('SELECT test()') == ret
50
51
52 def test_load_file_with_params(sql_preprocessor, sql_factory, temp_db_conn, temp_db_cursor):
53     sqlfile = sql_factory("RETURN '{{ foo }} {{ bar }}';")
54
55     sql_preprocessor.run_sql_file(temp_db_conn, sqlfile, bar='XX', foo='ZZ')
56
57     assert temp_db_cursor.scalar('SELECT test()') == 'ZZ XX'
58
59
60 @pytest.mark.asyncio
61 async def test_load_parallel_file(dsn, sql_preprocessor, tmp_path, temp_db_cursor):
62     (tmp_path / 'test.sql').write_text("""
63         CREATE TABLE foo (a TEXT);
64         CREATE TABLE foo2(a TEXT);""" + 
65         "\n---\nCREATE TABLE bar (b INT);")
66
67     await sql_preprocessor.run_parallel_sql_file(dsn, 'test.sql', num_threads=4)
68
69     assert temp_db_cursor.table_exists('foo')
70     assert temp_db_cursor.table_exists('foo2')
71     assert temp_db_cursor.table_exists('bar')