"""
Helper functions for handling DB accesses.
"""
+import subprocess
+import logging
-def execute_file(conn, fname):
- """ Read an SQL file and run its contents against the given connection.
+from .connection import get_pg_env
+from ..errors import UsageError
+
+LOG = logging.getLogger()
+
+def execute_file(dsn, fname, ignore_errors=False):
+ """ Read an SQL file and run its contents against the given database
+ using psql.
"""
- with fname.open('r') as fdesc:
- sql = fdesc.read()
- with conn.cursor() as cur:
- cur.execute(sql)
- conn.commit()
+ cmd = ['psql']
+ if not ignore_errors:
+ cmd.extend(('-v', 'ON_ERROR_STOP=1'))
+ proc = subprocess.Popen(cmd, env=get_pg_env(dsn), stdin=subprocess.PIPE)
+
+ if not LOG.isEnabledFor(logging.INFO):
+ proc.stdin.write('set client_min_messages to WARNING;'.encode('utf-8'))
+
+ with fname.open('rb') as fdesc:
+ chunk = fdesc.read(2048)
+ while chunk and proc.poll() is None:
+ proc.stdin.write(chunk)
+ chunk = fdesc.read(2048)
+
+ proc.stdin.close()
+
+ ret = proc.wait()
+ print(ret, chunk)
+ if ret != 0 or chunk:
+ raise UsageError("Failed to execute SQL file.")
LOG = logging.getLogger()
-def update_postcodes(conn, sql_dir):
+def update_postcodes(dsn, sql_dir):
""" Recalculate postcode centroids and add, remove and update entries in the
location_postcode table. `conn` is an opne connection to the database.
"""
- execute_file(conn, sql_dir / 'update-postcodes.sql')
+ execute_file(dsn, sql_dir / 'update-postcodes.sql')
-def recompute_word_counts(conn, sql_dir):
+def recompute_word_counts(dsn, sql_dir):
""" Compute the frequency of full-word search terms.
"""
- execute_file(conn, sql_dir / 'words_from_search_name.sql')
+ execute_file(dsn, sql_dir / 'words_from_search_name.sql')
def _add_address_level_rows_from_entry(rows, entry):
import pytest
import nominatim.db.utils as db_utils
+from nominatim.errors import UsageError
-def test_execute_file_success(temp_db_conn, tmp_path):
+@pytest.fixture
+def dsn(temp_db):
+ return 'dbname=' + temp_db
+
+def test_execute_file_success(dsn, temp_db_cursor, tmp_path):
tmpfile = tmp_path / 'test.sql'
tmpfile.write_text('CREATE TABLE test (id INT);\nINSERT INTO test VALUES(56);')
- db_utils.execute_file(temp_db_conn, tmpfile)
+ db_utils.execute_file(dsn, tmpfile)
- with temp_db_conn.cursor() as cur:
- cur.execute('SELECT * FROM test')
+ temp_db_cursor.execute('SELECT * FROM test')
- assert cur.rowcount == 1
- assert cur.fetchone()[0] == 56
+ assert temp_db_cursor.rowcount == 1
+ assert temp_db_cursor.fetchone()[0] == 56
-def test_execute_file_bad_file(temp_db_conn, tmp_path):
+def test_execute_file_bad_file(dsn, tmp_path):
with pytest.raises(FileNotFoundError):
- db_utils.execute_file(temp_db_conn, tmp_path / 'test2.sql')
+ db_utils.execute_file(dsn, tmp_path / 'test2.sql')
+
+def test_execute_file_bad_sql(dsn, tmp_path):
+ tmpfile = tmp_path / 'test.sql'
+ tmpfile.write_text('CREATE STABLE test (id INT)')
+
+ with pytest.raises(UsageError):
+ db_utils.execute_file(dsn, tmpfile)
+
-def test_execute_file_bad_sql(temp_db_conn, tmp_path):
+def test_execute_file_bad_sql_ignore_errors(dsn, tmp_path):
tmpfile = tmp_path / 'test.sql'
tmpfile.write_text('CREATE STABLE test (id INT)')
- with pytest.raises(psycopg2.ProgrammingError):
- db_utils.execute_file(temp_db_conn, tmpfile)
+ db_utils.execute_file(dsn, tmpfile, ignore_errors=True)