"""
Helper functions for handling DB accesses.
"""
-from typing import IO, Optional, Union, Any, Iterable
+from typing import IO, Optional, Union
import subprocess
import logging
import gzip
-import io
from pathlib import Path
-from .connection import get_pg_env, Cursor
+from .connection import get_pg_env
from ..errors import UsageError
LOG = logging.getLogger()
+
def _pipe_to_proc(proc: 'subprocess.Popen[bytes]',
fdesc: Union[IO[bytes], gzip.GzipFile]) -> int:
assert proc.stdin is not None
return len(chunk)
+
def execute_file(dsn: str, fname: Path,
ignore_errors: bool = False,
pre_code: Optional[str] = None,
if ret != 0 or remain > 0:
raise UsageError("Failed to execute SQL file.")
-
-
-# List of characters that need to be quoted for the copy command.
-_SQL_TRANSLATION = {ord('\\'): '\\\\',
- ord('\t'): '\\t',
- ord('\n'): '\\n'}
-
-
-class CopyBuffer:
- """ Data collector for the copy_from command.
- """
-
- def __init__(self) -> None:
- self.buffer = io.StringIO()
-
-
- def __enter__(self) -> 'CopyBuffer':
- return self
-
-
- def size(self) -> int:
- """ Return the number of bytes the buffer currently contains.
- """
- return self.buffer.tell()
-
- def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
- if self.buffer is not None:
- self.buffer.close()
-
-
- def add(self, *data: Any) -> None:
- """ Add another row of data to the copy buffer.
- """
- first = True
- for column in data:
- if first:
- first = False
- else:
- self.buffer.write('\t')
- if column is None:
- self.buffer.write('\\N')
- else:
- self.buffer.write(str(column).translate(_SQL_TRANSLATION))
- self.buffer.write('\n')
-
-
- def copy_out(self, cur: Cursor, table: str, columns: Optional[Iterable[str]] = None) -> None:
- """ Copy all collected data into the given table.
-
- The buffer is empty and reusable after this operation.
- """
- if self.buffer.tell() > 0:
- self.buffer.seek(0)
- cur.copy_from(self.buffer, table, columns=columns)
- self.buffer = io.StringIO()