X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/4da4cbfe27a576ae011430b2de205c74435e241b..3acd7df5c44345ad85aa97a92d582c60a4eba240:/src/nominatim_db/db/utils.py?ds=sidebyside diff --git a/src/nominatim_db/db/utils.py b/src/nominatim_db/db/utils.py index 32bf79ac..63eba72c 100644 --- a/src/nominatim_db/db/utils.py +++ b/src/nominatim_db/db/utils.py @@ -7,18 +7,18 @@ """ Helper functions for handling DB accesses. """ -from typing import IO, Optional, Union, Any, Iterable +from typing import IO, Optional, Union import subprocess import logging import gzip -import io from pathlib import Path -from .connection import get_pg_env, Cursor +from .connection import get_pg_env from ..errors import UsageError LOG = logging.getLogger() + def _pipe_to_proc(proc: 'subprocess.Popen[bytes]', fdesc: Union[IO[bytes], gzip.GzipFile]) -> int: assert proc.stdin is not None @@ -32,6 +32,7 @@ def _pipe_to_proc(proc: 'subprocess.Popen[bytes]', return len(chunk) + def execute_file(dsn: str, fname: Path, ignore_errors: bool = False, pre_code: Optional[str] = None, @@ -72,58 +73,3 @@ def execute_file(dsn: str, fname: Path, if ret != 0 or remain > 0: raise UsageError("Failed to execute SQL file.") - - -# List of characters that need to be quoted for the copy command. -_SQL_TRANSLATION = {ord('\\'): '\\\\', - ord('\t'): '\\t', - ord('\n'): '\\n'} - - -class CopyBuffer: - """ Data collector for the copy_from command. - """ - - def __init__(self) -> None: - self.buffer = io.StringIO() - - - def __enter__(self) -> 'CopyBuffer': - return self - - - def size(self) -> int: - """ Return the number of bytes the buffer currently contains. - """ - return self.buffer.tell() - - def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: - if self.buffer is not None: - self.buffer.close() - - - def add(self, *data: Any) -> None: - """ Add another row of data to the copy buffer. - """ - first = True - for column in data: - if first: - first = False - else: - self.buffer.write('\t') - if column is None: - self.buffer.write('\\N') - else: - self.buffer.write(str(column).translate(_SQL_TRANSLATION)) - self.buffer.write('\n') - - - def copy_out(self, cur: Cursor, table: str, columns: Optional[Iterable[str]] = None) -> None: - """ Copy all collected data into the given table. - - The buffer is empty and reusable after this operation. - """ - if self.buffer.tell() > 0: - self.buffer.seek(0) - cur.copy_from(self.buffer, table, columns=columns) - self.buffer = io.StringIO()