]> git.openstreetmap.org Git - nominatim.git/blob - test/python/db/test_utils.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / python / db / test_utils.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for DB utility functions in db.utils
9 """
10 import json
11
12 import pytest
13
14 import nominatim.db.utils as db_utils
15 from nominatim.errors import UsageError
16
17 def test_execute_file_success(dsn, temp_db_cursor, tmp_path):
18     tmpfile = tmp_path / 'test.sql'
19     tmpfile.write_text('CREATE TABLE test (id INT);\nINSERT INTO test VALUES(56);')
20
21     db_utils.execute_file(dsn, tmpfile)
22
23     assert temp_db_cursor.row_set('SELECT * FROM test') == {(56, )}
24
25 def test_execute_file_bad_file(dsn, tmp_path):
26     with pytest.raises(FileNotFoundError):
27         db_utils.execute_file(dsn, tmp_path / 'test2.sql')
28
29
30 def test_execute_file_bad_sql(dsn, tmp_path):
31     tmpfile = tmp_path / 'test.sql'
32     tmpfile.write_text('CREATE STABLE test (id INT)')
33
34     with pytest.raises(UsageError):
35         db_utils.execute_file(dsn, tmpfile)
36
37
38 def test_execute_file_bad_sql_ignore_errors(dsn, tmp_path):
39     tmpfile = tmp_path / 'test.sql'
40     tmpfile.write_text('CREATE STABLE test (id INT)')
41
42     db_utils.execute_file(dsn, tmpfile, ignore_errors=True)
43
44
45 def test_execute_file_with_pre_code(dsn, tmp_path, temp_db_cursor):
46     tmpfile = tmp_path / 'test.sql'
47     tmpfile.write_text('INSERT INTO test VALUES(4)')
48
49     db_utils.execute_file(dsn, tmpfile, pre_code='CREATE TABLE test (id INT)')
50
51     assert temp_db_cursor.row_set('SELECT * FROM test') == {(4, )}
52
53
54 def test_execute_file_with_post_code(dsn, tmp_path, temp_db_cursor):
55     tmpfile = tmp_path / 'test.sql'
56     tmpfile.write_text('CREATE TABLE test (id INT)')
57
58     db_utils.execute_file(dsn, tmpfile, post_code='INSERT INTO test VALUES(23)')
59
60     assert temp_db_cursor.row_set('SELECT * FROM test') == {(23, )}
61
62
63 class TestCopyBuffer:
64     TABLE_NAME = 'copytable'
65
66     @pytest.fixture(autouse=True)
67     def setup_test_table(self, table_factory):
68         table_factory(self.TABLE_NAME, 'col_a INT, col_b TEXT')
69
70
71     def table_rows(self, cursor):
72         return cursor.row_set('SELECT * FROM ' + self.TABLE_NAME)
73
74
75     def test_copybuffer_empty(self):
76         with db_utils.CopyBuffer() as buf:
77             buf.copy_out(None, "dummy")
78
79
80     def test_all_columns(self, temp_db_cursor):
81         with db_utils.CopyBuffer() as buf:
82             buf.add(3, 'hum')
83             buf.add(None, 'f\\t')
84
85             buf.copy_out(temp_db_cursor, self.TABLE_NAME)
86
87         assert self.table_rows(temp_db_cursor) == {(3, 'hum'), (None, 'f\\t')}
88
89
90     def test_selected_columns(self, temp_db_cursor):
91         with db_utils.CopyBuffer() as buf:
92             buf.add('foo')
93
94             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
95                          columns=['col_b'])
96
97         assert self.table_rows(temp_db_cursor) == {(None, 'foo')}
98
99
100     def test_reordered_columns(self, temp_db_cursor):
101         with db_utils.CopyBuffer() as buf:
102             buf.add('one', 1)
103             buf.add(' two ', 2)
104
105             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
106                          columns=['col_b', 'col_a'])
107
108         assert self.table_rows(temp_db_cursor) == {(1, 'one'), (2, ' two ')}
109
110
111     def test_special_characters(self, temp_db_cursor):
112         with db_utils.CopyBuffer() as buf:
113             buf.add('foo\tbar')
114             buf.add('sun\nson')
115             buf.add('\\N')
116
117             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
118                          columns=['col_b'])
119
120         assert self.table_rows(temp_db_cursor) == {(None, 'foo\tbar'),
121                                                    (None, 'sun\nson'),
122                                                    (None, '\\N')}
123
124
125
126 class TestCopyBufferJson:
127     TABLE_NAME = 'copytable'
128
129     @pytest.fixture(autouse=True)
130     def setup_test_table(self, table_factory):
131         table_factory(self.TABLE_NAME, 'col_a INT, col_b JSONB')
132
133
134     def table_rows(self, cursor):
135         cursor.execute('SELECT * FROM ' + self.TABLE_NAME)
136         results = {k: v for k,v in cursor}
137
138         assert len(results) == cursor.rowcount
139
140         return results
141
142
143     def test_json_object(self, temp_db_cursor):
144         with db_utils.CopyBuffer() as buf:
145             buf.add(1, json.dumps({'test': 'value', 'number': 1}))
146
147             buf.copy_out(temp_db_cursor, self.TABLE_NAME)
148
149         assert self.table_rows(temp_db_cursor) == \
150                    {1: {'test': 'value', 'number': 1}}
151
152
153     def test_json_object_special_chras(self, temp_db_cursor):
154         with db_utils.CopyBuffer() as buf:
155             buf.add(1, json.dumps({'te\tst': 'va\nlue', 'nu"mber': None}))
156
157             buf.copy_out(temp_db_cursor, self.TABLE_NAME)
158
159         assert self.table_rows(temp_db_cursor) == \
160                    {1: {'te\tst': 'va\nlue', 'nu"mber': None}}