X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/94fa7162be678dc74c1a5516e22916122ba66bbb..f3c4196c758a7720aefc26a7043bf1c6553028e2:/test/python/test_db_utils.py diff --git a/test/python/test_db_utils.py b/test/python/test_db_utils.py index e756f2c4..9eea7ed1 100644 --- a/test/python/test_db_utils.py +++ b/test/python/test_db_utils.py @@ -1,30 +1,154 @@ """ Tests for DB utility functions in db.utils """ -import psycopg2 +import json + import pytest import nominatim.db.utils as db_utils +from nominatim.errors import UsageError -def test_execute_file_success(temp_db_conn, tmp_path): +def test_execute_file_success(dsn, temp_db_cursor, tmp_path): tmpfile = tmp_path / 'test.sql' tmpfile.write_text('CREATE TABLE test (id INT);\nINSERT INTO test VALUES(56);') - db_utils.execute_file(temp_db_conn, tmpfile) - - with temp_db_conn.cursor() as cur: - cur.execute('SELECT * FROM test') + db_utils.execute_file(dsn, tmpfile) - assert cur.rowcount == 1 - assert cur.fetchone()[0] == 56 + assert temp_db_cursor.row_set('SELECT * FROM test') == {(56, )} -def test_execute_file_bad_file(temp_db_conn, tmp_path): +def test_execute_file_bad_file(dsn, tmp_path): with pytest.raises(FileNotFoundError): - db_utils.execute_file(temp_db_conn, tmp_path / 'test2.sql') + db_utils.execute_file(dsn, tmp_path / 'test2.sql') + + +def test_execute_file_bad_sql(dsn, tmp_path): + tmpfile = tmp_path / 'test.sql' + tmpfile.write_text('CREATE STABLE test (id INT)') + + with pytest.raises(UsageError): + db_utils.execute_file(dsn, tmpfile) + -def test_execute_file_bad_sql(temp_db_conn, tmp_path): +def test_execute_file_bad_sql_ignore_errors(dsn, tmp_path): tmpfile = tmp_path / 'test.sql' tmpfile.write_text('CREATE STABLE test (id INT)') - with pytest.raises(psycopg2.ProgrammingError): - db_utils.execute_file(temp_db_conn, tmpfile) + db_utils.execute_file(dsn, tmpfile, ignore_errors=True) + + +def test_execute_file_with_pre_code(dsn, tmp_path, temp_db_cursor): + tmpfile = tmp_path / 'test.sql' + tmpfile.write_text('INSERT INTO test VALUES(4)') + + db_utils.execute_file(dsn, tmpfile, pre_code='CREATE TABLE test (id INT)') + + assert temp_db_cursor.row_set('SELECT * FROM test') == {(4, )} + + +def test_execute_file_with_post_code(dsn, tmp_path, temp_db_cursor): + tmpfile = tmp_path / 'test.sql' + tmpfile.write_text('CREATE TABLE test (id INT)') + + db_utils.execute_file(dsn, tmpfile, post_code='INSERT INTO test VALUES(23)') + + assert temp_db_cursor.row_set('SELECT * FROM test') == {(23, )} + + +class TestCopyBuffer: + TABLE_NAME = 'copytable' + + @pytest.fixture(autouse=True) + def setup_test_table(self, table_factory): + table_factory(self.TABLE_NAME, 'colA INT, colB TEXT') + + + def table_rows(self, cursor): + return cursor.row_set('SELECT * FROM ' + self.TABLE_NAME) + + + def test_copybuffer_empty(self): + with db_utils.CopyBuffer() as buf: + buf.copy_out(None, "dummy") + + + def test_all_columns(self, temp_db_cursor): + with db_utils.CopyBuffer() as buf: + buf.add(3, 'hum') + buf.add(None, 'f\\t') + + buf.copy_out(temp_db_cursor, self.TABLE_NAME) + + assert self.table_rows(temp_db_cursor) == {(3, 'hum'), (None, 'f\\t')} + + + def test_selected_columns(self, temp_db_cursor): + with db_utils.CopyBuffer() as buf: + buf.add('foo') + + buf.copy_out(temp_db_cursor, self.TABLE_NAME, + columns=['colB']) + + assert self.table_rows(temp_db_cursor) == {(None, 'foo')} + + + def test_reordered_columns(self, temp_db_cursor): + with db_utils.CopyBuffer() as buf: + buf.add('one', 1) + buf.add(' two ', 2) + + buf.copy_out(temp_db_cursor, self.TABLE_NAME, + columns=['colB', 'colA']) + + assert self.table_rows(temp_db_cursor) == {(1, 'one'), (2, ' two ')} + + + def test_special_characters(self, temp_db_cursor): + with db_utils.CopyBuffer() as buf: + buf.add('foo\tbar') + buf.add('sun\nson') + buf.add('\\N') + + buf.copy_out(temp_db_cursor, self.TABLE_NAME, + columns=['colB']) + + assert self.table_rows(temp_db_cursor) == {(None, 'foo\tbar'), + (None, 'sun\nson'), + (None, '\\N')} + + + +class TestCopyBufferJson: + TABLE_NAME = 'copytable' + + @pytest.fixture(autouse=True) + def setup_test_table(self, table_factory): + table_factory(self.TABLE_NAME, 'colA INT, colB JSONB') + + + def table_rows(self, cursor): + cursor.execute('SELECT * FROM ' + self.TABLE_NAME) + results = {k: v for k,v in cursor} + + assert len(results) == cursor.rowcount + + return results + + + def test_json_object(self, temp_db_cursor): + with db_utils.CopyBuffer() as buf: + buf.add(1, json.dumps({'test': 'value', 'number': 1})) + + buf.copy_out(temp_db_cursor, self.TABLE_NAME) + + assert self.table_rows(temp_db_cursor) == \ + {1: {'test': 'value', 'number': 1}} + + + def test_json_object_special_chras(self, temp_db_cursor): + with db_utils.CopyBuffer() as buf: + buf.add(1, json.dumps({'te\tst': 'va\nlue', 'nu"mber': None})) + + buf.copy_out(temp_db_cursor, self.TABLE_NAME) + + assert self.table_rows(temp_db_cursor) == \ + {1: {'te\tst': 'va\nlue', 'nu"mber': None}}