]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_db_utils.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / python / test_db_utils.py
1 """
2 Tests for DB utility functions in db.utils
3 """
4 import pytest
5
6 import nominatim.db.utils as db_utils
7 from nominatim.errors import UsageError
8
9 def test_execute_file_success(dsn, temp_db_cursor, tmp_path):
10     tmpfile = tmp_path / 'test.sql'
11     tmpfile.write_text('CREATE TABLE test (id INT);\nINSERT INTO test VALUES(56);')
12
13     db_utils.execute_file(dsn, tmpfile)
14
15     assert temp_db_cursor.row_set('SELECT * FROM test') == {(56, )}
16
17 def test_execute_file_bad_file(dsn, tmp_path):
18     with pytest.raises(FileNotFoundError):
19         db_utils.execute_file(dsn, tmp_path / 'test2.sql')
20
21
22 def test_execute_file_bad_sql(dsn, tmp_path):
23     tmpfile = tmp_path / 'test.sql'
24     tmpfile.write_text('CREATE STABLE test (id INT)')
25
26     with pytest.raises(UsageError):
27         db_utils.execute_file(dsn, tmpfile)
28
29
30 def test_execute_file_bad_sql_ignore_errors(dsn, tmp_path):
31     tmpfile = tmp_path / 'test.sql'
32     tmpfile.write_text('CREATE STABLE test (id INT)')
33
34     db_utils.execute_file(dsn, tmpfile, ignore_errors=True)
35
36
37 def test_execute_file_with_pre_code(dsn, tmp_path, temp_db_cursor):
38     tmpfile = tmp_path / 'test.sql'
39     tmpfile.write_text('INSERT INTO test VALUES(4)')
40
41     db_utils.execute_file(dsn, tmpfile, pre_code='CREATE TABLE test (id INT)')
42
43     assert temp_db_cursor.row_set('SELECT * FROM test') == {(4, )}
44
45
46 def test_execute_file_with_post_code(dsn, tmp_path, temp_db_cursor):
47     tmpfile = tmp_path / 'test.sql'
48     tmpfile.write_text('CREATE TABLE test (id INT)')
49
50     db_utils.execute_file(dsn, tmpfile, post_code='INSERT INTO test VALUES(23)')
51
52     assert temp_db_cursor.row_set('SELECT * FROM test') == {(23, )}
53
54
55 class TestCopyBuffer:
56     TABLE_NAME = 'copytable'
57
58     @pytest.fixture(autouse=True)
59     def setup_test_table(self, table_factory):
60         table_factory(self.TABLE_NAME, 'colA INT, colB TEXT')
61
62
63     def table_rows(self, cursor):
64         return cursor.row_set('SELECT * FROM ' + self.TABLE_NAME)
65
66
67     def test_copybuffer_empty(self):
68         with db_utils.CopyBuffer() as buf:
69             buf.copy_out(None, "dummy")
70
71
72     def test_all_columns(self, temp_db_cursor):
73         with db_utils.CopyBuffer() as buf:
74             buf.add(3, 'hum')
75             buf.add(None, 'f\\t')
76
77             buf.copy_out(temp_db_cursor, self.TABLE_NAME)
78
79         assert self.table_rows(temp_db_cursor) == {(3, 'hum'), (None, 'f\\t')}
80
81
82     def test_selected_columns(self, temp_db_cursor):
83         with db_utils.CopyBuffer() as buf:
84             buf.add('foo')
85
86             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
87                          columns=['colB'])
88
89         assert self.table_rows(temp_db_cursor) == {(None, 'foo')}
90
91
92     def test_reordered_columns(self, temp_db_cursor):
93         with db_utils.CopyBuffer() as buf:
94             buf.add('one', 1)
95             buf.add(' two ', 2)
96
97             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
98                          columns=['colB', 'colA'])
99
100         assert self.table_rows(temp_db_cursor) == {(1, 'one'), (2, ' two ')}
101
102
103     def test_special_characters(self, temp_db_cursor):
104         with db_utils.CopyBuffer() as buf:
105             buf.add('foo\tbar')
106             buf.add('sun\nson')
107             buf.add('\\N')
108
109             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
110                          columns=['colB'])
111
112         assert self.table_rows(temp_db_cursor) == {(None, 'foo\tbar'),
113                                                    (None, 'sun\nson'),
114                                                    (None, '\\N')}
115
116
117