1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for DB utility functions in db.utils
14 import nominatim_db.db.utils as db_utils
15 from nominatim_db.errors import UsageError
17 def test_execute_file_success(dsn, temp_db_cursor, tmp_path):
18 tmpfile = tmp_path / 'test.sql'
19 tmpfile.write_text('CREATE TABLE test (id INT);\nINSERT INTO test VALUES(56);')
21 db_utils.execute_file(dsn, tmpfile)
23 assert temp_db_cursor.row_set('SELECT * FROM test') == {(56, )}
25 def test_execute_file_bad_file(dsn, tmp_path):
26 with pytest.raises(FileNotFoundError):
27 db_utils.execute_file(dsn, tmp_path / 'test2.sql')
30 def test_execute_file_bad_sql(dsn, tmp_path):
31 tmpfile = tmp_path / 'test.sql'
32 tmpfile.write_text('CREATE STABLE test (id INT)')
34 with pytest.raises(UsageError):
35 db_utils.execute_file(dsn, tmpfile)
38 def test_execute_file_bad_sql_ignore_errors(dsn, tmp_path):
39 tmpfile = tmp_path / 'test.sql'
40 tmpfile.write_text('CREATE STABLE test (id INT)')
42 db_utils.execute_file(dsn, tmpfile, ignore_errors=True)
45 def test_execute_file_with_pre_code(dsn, tmp_path, temp_db_cursor):
46 tmpfile = tmp_path / 'test.sql'
47 tmpfile.write_text('INSERT INTO test VALUES(4)')
49 db_utils.execute_file(dsn, tmpfile, pre_code='CREATE TABLE test (id INT)')
51 assert temp_db_cursor.row_set('SELECT * FROM test') == {(4, )}
54 def test_execute_file_with_post_code(dsn, tmp_path, temp_db_cursor):
55 tmpfile = tmp_path / 'test.sql'
56 tmpfile.write_text('CREATE TABLE test (id INT)')
58 db_utils.execute_file(dsn, tmpfile, post_code='INSERT INTO test VALUES(23)')
60 assert temp_db_cursor.row_set('SELECT * FROM test') == {(23, )}
64 TABLE_NAME = 'copytable'
66 @pytest.fixture(autouse=True)
67 def setup_test_table(self, table_factory):
68 table_factory(self.TABLE_NAME, 'col_a INT, col_b TEXT')
71 def table_rows(self, cursor):
72 return cursor.row_set('SELECT * FROM ' + self.TABLE_NAME)
75 def test_copybuffer_empty(self):
76 with db_utils.CopyBuffer() as buf:
77 buf.copy_out(None, "dummy")
80 def test_all_columns(self, temp_db_cursor):
81 with db_utils.CopyBuffer() as buf:
85 buf.copy_out(temp_db_cursor, self.TABLE_NAME)
87 assert self.table_rows(temp_db_cursor) == {(3, 'hum'), (None, 'f\\t')}
90 def test_selected_columns(self, temp_db_cursor):
91 with db_utils.CopyBuffer() as buf:
94 buf.copy_out(temp_db_cursor, self.TABLE_NAME,
97 assert self.table_rows(temp_db_cursor) == {(None, 'foo')}
100 def test_reordered_columns(self, temp_db_cursor):
101 with db_utils.CopyBuffer() as buf:
105 buf.copy_out(temp_db_cursor, self.TABLE_NAME,
106 columns=['col_b', 'col_a'])
108 assert self.table_rows(temp_db_cursor) == {(1, 'one'), (2, ' two ')}
111 def test_special_characters(self, temp_db_cursor):
112 with db_utils.CopyBuffer() as buf:
117 buf.copy_out(temp_db_cursor, self.TABLE_NAME,
120 assert self.table_rows(temp_db_cursor) == {(None, 'foo\tbar'),
126 class TestCopyBufferJson:
127 TABLE_NAME = 'copytable'
129 @pytest.fixture(autouse=True)
130 def setup_test_table(self, table_factory):
131 table_factory(self.TABLE_NAME, 'col_a INT, col_b JSONB')
134 def table_rows(self, cursor):
135 cursor.execute('SELECT * FROM ' + self.TABLE_NAME)
136 results = {k: v for k,v in cursor}
138 assert len(results) == cursor.rowcount
143 def test_json_object(self, temp_db_cursor):
144 with db_utils.CopyBuffer() as buf:
145 buf.add(1, json.dumps({'test': 'value', 'number': 1}))
147 buf.copy_out(temp_db_cursor, self.TABLE_NAME)
149 assert self.table_rows(temp_db_cursor) == \
150 {1: {'test': 'value', 'number': 1}}
153 def test_json_object_special_chras(self, temp_db_cursor):
154 with db_utils.CopyBuffer() as buf:
155 buf.add(1, json.dumps({'te\tst': 'va\nlue', 'nu"mber': None}))
157 buf.copy_out(temp_db_cursor, self.TABLE_NAME)
159 assert self.table_rows(temp_db_cursor) == \
160 {1: {'te\tst': 'va\nlue', 'nu"mber': None}}