"""
Tests for DB utility functions in db.utils
"""
-import psycopg2
+import json
+
import pytest
import nominatim.db.utils as db_utils
db_utils.execute_file(dsn, tmpfile)
- temp_db_cursor.execute('SELECT * FROM test')
-
- assert temp_db_cursor.rowcount == 1
- assert temp_db_cursor.fetchone()[0] == 56
+ assert temp_db_cursor.row_set('SELECT * FROM test') == {(56, )}
def test_execute_file_bad_file(dsn, tmp_path):
with pytest.raises(FileNotFoundError):
db_utils.execute_file(dsn, tmpfile, pre_code='CREATE TABLE test (id INT)')
- temp_db_cursor.execute('SELECT * FROM test')
-
- assert temp_db_cursor.rowcount == 1
- assert temp_db_cursor.fetchone()[0] == 4
+ assert temp_db_cursor.row_set('SELECT * FROM test') == {(4, )}
def test_execute_file_with_post_code(dsn, tmp_path, temp_db_cursor):
db_utils.execute_file(dsn, tmpfile, post_code='INSERT INTO test VALUES(23)')
- temp_db_cursor.execute('SELECT * FROM test')
+ assert temp_db_cursor.row_set('SELECT * FROM test') == {(23, )}
+
+
+class TestCopyBuffer:
+ TABLE_NAME = 'copytable'
+
+ @pytest.fixture(autouse=True)
+ def setup_test_table(self, table_factory):
+ table_factory(self.TABLE_NAME, 'colA INT, colB TEXT')
+
+
+ def table_rows(self, cursor):
+ return cursor.row_set('SELECT * FROM ' + self.TABLE_NAME)
+
+
+ def test_copybuffer_empty(self):
+ with db_utils.CopyBuffer() as buf:
+ buf.copy_out(None, "dummy")
+
+
+ def test_all_columns(self, temp_db_cursor):
+ with db_utils.CopyBuffer() as buf:
+ buf.add(3, 'hum')
+ buf.add(None, 'f\\t')
+
+ buf.copy_out(temp_db_cursor, self.TABLE_NAME)
+
+ assert self.table_rows(temp_db_cursor) == {(3, 'hum'), (None, 'f\\t')}
+
+
+ def test_selected_columns(self, temp_db_cursor):
+ with db_utils.CopyBuffer() as buf:
+ buf.add('foo')
+
+ buf.copy_out(temp_db_cursor, self.TABLE_NAME,
+ columns=['colB'])
+
+ assert self.table_rows(temp_db_cursor) == {(None, 'foo')}
+
+
+ def test_reordered_columns(self, temp_db_cursor):
+ with db_utils.CopyBuffer() as buf:
+ buf.add('one', 1)
+ buf.add(' two ', 2)
+
+ buf.copy_out(temp_db_cursor, self.TABLE_NAME,
+ columns=['colB', 'colA'])
+
+ assert self.table_rows(temp_db_cursor) == {(1, 'one'), (2, ' two ')}
+
+
+ def test_special_characters(self, temp_db_cursor):
+ with db_utils.CopyBuffer() as buf:
+ buf.add('foo\tbar')
+ buf.add('sun\nson')
+ buf.add('\\N')
+
+ buf.copy_out(temp_db_cursor, self.TABLE_NAME,
+ columns=['colB'])
+
+ assert self.table_rows(temp_db_cursor) == {(None, 'foo\tbar'),
+ (None, 'sun\nson'),
+ (None, '\\N')}
+
+
+
+class TestCopyBufferJson:
+ TABLE_NAME = 'copytable'
+
+ @pytest.fixture(autouse=True)
+ def setup_test_table(self, table_factory):
+ table_factory(self.TABLE_NAME, 'colA INT, colB JSONB')
+
+
+ def table_rows(self, cursor):
+ cursor.execute('SELECT * FROM ' + self.TABLE_NAME)
+ results = {k: v for k,v in cursor}
+
+ assert len(results) == cursor.rowcount
+
+ return results
+
+
+ def test_json_object(self, temp_db_cursor):
+ with db_utils.CopyBuffer() as buf:
+ buf.add(1, json.dumps({'test': 'value', 'number': 1}))
+
+ buf.copy_out(temp_db_cursor, self.TABLE_NAME)
+
+ assert self.table_rows(temp_db_cursor) == \
+ {1: {'test': 'value', 'number': 1}}
+
+
+ def test_json_object_special_chras(self, temp_db_cursor):
+ with db_utils.CopyBuffer() as buf:
+ buf.add(1, json.dumps({'te\tst': 'va\nlue', 'nu"mber': None}))
+
+ buf.copy_out(temp_db_cursor, self.TABLE_NAME)
- assert temp_db_cursor.rowcount == 1
- assert temp_db_cursor.fetchone()[0] == 23
+ assert self.table_rows(temp_db_cursor) == \
+ {1: {'te\tst': 'va\nlue', 'nu"mber': None}}