]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_db_utils.py
allow multiple files for the import command
[nominatim.git] / test / python / test_db_utils.py
1 """
2 Tests for DB utility functions in db.utils
3 """
4 import json
5
6 import pytest
7
8 import nominatim.db.utils as db_utils
9 from nominatim.errors import UsageError
10
11 def test_execute_file_success(dsn, temp_db_cursor, tmp_path):
12     tmpfile = tmp_path / 'test.sql'
13     tmpfile.write_text('CREATE TABLE test (id INT);\nINSERT INTO test VALUES(56);')
14
15     db_utils.execute_file(dsn, tmpfile)
16
17     assert temp_db_cursor.row_set('SELECT * FROM test') == {(56, )}
18
19 def test_execute_file_bad_file(dsn, tmp_path):
20     with pytest.raises(FileNotFoundError):
21         db_utils.execute_file(dsn, tmp_path / 'test2.sql')
22
23
24 def test_execute_file_bad_sql(dsn, tmp_path):
25     tmpfile = tmp_path / 'test.sql'
26     tmpfile.write_text('CREATE STABLE test (id INT)')
27
28     with pytest.raises(UsageError):
29         db_utils.execute_file(dsn, tmpfile)
30
31
32 def test_execute_file_bad_sql_ignore_errors(dsn, tmp_path):
33     tmpfile = tmp_path / 'test.sql'
34     tmpfile.write_text('CREATE STABLE test (id INT)')
35
36     db_utils.execute_file(dsn, tmpfile, ignore_errors=True)
37
38
39 def test_execute_file_with_pre_code(dsn, tmp_path, temp_db_cursor):
40     tmpfile = tmp_path / 'test.sql'
41     tmpfile.write_text('INSERT INTO test VALUES(4)')
42
43     db_utils.execute_file(dsn, tmpfile, pre_code='CREATE TABLE test (id INT)')
44
45     assert temp_db_cursor.row_set('SELECT * FROM test') == {(4, )}
46
47
48 def test_execute_file_with_post_code(dsn, tmp_path, temp_db_cursor):
49     tmpfile = tmp_path / 'test.sql'
50     tmpfile.write_text('CREATE TABLE test (id INT)')
51
52     db_utils.execute_file(dsn, tmpfile, post_code='INSERT INTO test VALUES(23)')
53
54     assert temp_db_cursor.row_set('SELECT * FROM test') == {(23, )}
55
56
57 class TestCopyBuffer:
58     TABLE_NAME = 'copytable'
59
60     @pytest.fixture(autouse=True)
61     def setup_test_table(self, table_factory):
62         table_factory(self.TABLE_NAME, 'colA INT, colB TEXT')
63
64
65     def table_rows(self, cursor):
66         return cursor.row_set('SELECT * FROM ' + self.TABLE_NAME)
67
68
69     def test_copybuffer_empty(self):
70         with db_utils.CopyBuffer() as buf:
71             buf.copy_out(None, "dummy")
72
73
74     def test_all_columns(self, temp_db_cursor):
75         with db_utils.CopyBuffer() as buf:
76             buf.add(3, 'hum')
77             buf.add(None, 'f\\t')
78
79             buf.copy_out(temp_db_cursor, self.TABLE_NAME)
80
81         assert self.table_rows(temp_db_cursor) == {(3, 'hum'), (None, 'f\\t')}
82
83
84     def test_selected_columns(self, temp_db_cursor):
85         with db_utils.CopyBuffer() as buf:
86             buf.add('foo')
87
88             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
89                          columns=['colB'])
90
91         assert self.table_rows(temp_db_cursor) == {(None, 'foo')}
92
93
94     def test_reordered_columns(self, temp_db_cursor):
95         with db_utils.CopyBuffer() as buf:
96             buf.add('one', 1)
97             buf.add(' two ', 2)
98
99             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
100                          columns=['colB', 'colA'])
101
102         assert self.table_rows(temp_db_cursor) == {(1, 'one'), (2, ' two ')}
103
104
105     def test_special_characters(self, temp_db_cursor):
106         with db_utils.CopyBuffer() as buf:
107             buf.add('foo\tbar')
108             buf.add('sun\nson')
109             buf.add('\\N')
110
111             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
112                          columns=['colB'])
113
114         assert self.table_rows(temp_db_cursor) == {(None, 'foo\tbar'),
115                                                    (None, 'sun\nson'),
116                                                    (None, '\\N')}
117
118
119
120 class TestCopyBufferJson:
121     TABLE_NAME = 'copytable'
122
123     @pytest.fixture(autouse=True)
124     def setup_test_table(self, table_factory):
125         table_factory(self.TABLE_NAME, 'colA INT, colB JSONB')
126
127
128     def table_rows(self, cursor):
129         cursor.execute('SELECT * FROM ' + self.TABLE_NAME)
130         results = {k: v for k,v in cursor}
131
132         assert len(results) == cursor.rowcount
133
134         return results
135
136
137     def test_json_object(self, temp_db_cursor):
138         with db_utils.CopyBuffer() as buf:
139             buf.add(1, json.dumps({'test': 'value', 'number': 1}))
140
141             buf.copy_out(temp_db_cursor, self.TABLE_NAME)
142
143         assert self.table_rows(temp_db_cursor) == \
144                    {1: {'test': 'value', 'number': 1}}
145
146
147     def test_json_object_special_chras(self, temp_db_cursor):
148         with db_utils.CopyBuffer() as buf:
149             buf.add(1, json.dumps({'te\tst': 'va\nlue', 'nu"mber': None}))
150
151             buf.copy_out(temp_db_cursor, self.TABLE_NAME)
152
153         assert self.table_rows(temp_db_cursor) == \
154                    {1: {'te\tst': 'va\nlue', 'nu"mber': None}}