]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/db/utils.py
Merge pull request #3086 from lonvia/close-connection-on-replication
[nominatim.git] / nominatim / db / utils.py
index 9a4a41a581661ced3048797b7ae1ff98d613dad0..9a7b4f164787b8abb03831477fe3b876357e9b25 100644 (file)
@@ -1,17 +1,27 @@
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2022 by the Nominatim developer community.
+# For a full list of authors see the git log.
 """
 Helper functions for handling DB accesses.
 """
 """
 Helper functions for handling DB accesses.
 """
+from typing import IO, Optional, Union, Any, Iterable
 import subprocess
 import logging
 import gzip
 import io
 import subprocess
 import logging
 import gzip
 import io
+from pathlib import Path
 
 
-from nominatim.db.connection import get_pg_env
+from nominatim.db.connection import get_pg_env, Cursor
 from nominatim.errors import UsageError
 
 LOG = logging.getLogger()
 
 from nominatim.errors import UsageError
 
 LOG = logging.getLogger()
 
-def _pipe_to_proc(proc, fdesc):
+def _pipe_to_proc(proc: 'subprocess.Popen[bytes]',
+                  fdesc: Union[IO[bytes], gzip.GzipFile]) -> int:
+    assert proc.stdin is not None
     chunk = fdesc.read(2048)
     while chunk and proc.poll() is None:
         try:
     chunk = fdesc.read(2048)
     while chunk and proc.poll() is None:
         try:
@@ -22,7 +32,10 @@ def _pipe_to_proc(proc, fdesc):
 
     return len(chunk)
 
 
     return len(chunk)
 
-def execute_file(dsn, fname, ignore_errors=False, pre_code=None, post_code=None):
+def execute_file(dsn: str, fname: Path,
+                 ignore_errors: bool = False,
+                 pre_code: Optional[str] = None,
+                 post_code: Optional[str] = None) -> None:
     """ Read an SQL file and run its contents against the given database
         using psql. Use `pre_code` and `post_code` to run extra commands
         before or after executing the file. The commands are run within the
     """ Read an SQL file and run its contents against the given database
         using psql. Use `pre_code` and `post_code` to run extra commands
         before or after executing the file. The commands are run within the
@@ -34,55 +47,57 @@ def execute_file(dsn, fname, ignore_errors=False, pre_code=None, post_code=None)
         cmd.extend(('-v', 'ON_ERROR_STOP=1'))
     if not LOG.isEnabledFor(logging.INFO):
         cmd.append('--quiet')
         cmd.extend(('-v', 'ON_ERROR_STOP=1'))
     if not LOG.isEnabledFor(logging.INFO):
         cmd.append('--quiet')
-    proc = subprocess.Popen(cmd, env=get_pg_env(dsn), stdin=subprocess.PIPE)
 
 
-    try:
-        if not LOG.isEnabledFor(logging.INFO):
-            proc.stdin.write('set client_min_messages to WARNING;'.encode('utf-8'))
+    with subprocess.Popen(cmd, env=get_pg_env(dsn), stdin=subprocess.PIPE) as proc:
+        assert proc.stdin is not None
+        try:
+            if not LOG.isEnabledFor(logging.INFO):
+                proc.stdin.write('set client_min_messages to WARNING;'.encode('utf-8'))
 
 
-        if pre_code:
-            proc.stdin.write((pre_code + ';').encode('utf-8'))
+            if pre_code:
+                proc.stdin.write((pre_code + ';').encode('utf-8'))
 
 
-        if fname.suffix == '.gz':
-            with gzip.open(str(fname), 'rb') as fdesc:
-                remain = _pipe_to_proc(proc, fdesc)
-        else:
-            with fname.open('rb') as fdesc:
-                remain = _pipe_to_proc(proc, fdesc)
+            if fname.suffix == '.gz':
+                with gzip.open(str(fname), 'rb') as fdesc:
+                    remain = _pipe_to_proc(proc, fdesc)
+            else:
+                with fname.open('rb') as fdesc:
+                    remain = _pipe_to_proc(proc, fdesc)
 
 
-        if remain == 0 and post_code:
-            proc.stdin.write((';' + post_code).encode('utf-8'))
-    finally:
-        proc.stdin.close()
-        ret = proc.wait()
+            if remain == 0 and post_code:
+                proc.stdin.write((';' + post_code).encode('utf-8'))
+        finally:
+            proc.stdin.close()
+            ret = proc.wait()
 
     if ret != 0 or remain > 0:
         raise UsageError("Failed to execute SQL file.")
 
 
 # List of characters that need to be quoted for the copy command.
 
     if ret != 0 or remain > 0:
         raise UsageError("Failed to execute SQL file.")
 
 
 # List of characters that need to be quoted for the copy command.
-_SQL_TRANSLATION = {ord(u'\\'): u'\\\\',
-                    ord(u'\t'): u'\\t',
-                    ord(u'\n'): u'\\n'}
+_SQL_TRANSLATION = {ord('\\'): '\\\\',
+                    ord('\t'): '\\t',
+                    ord('\n'): '\\n'}
+
 
 class CopyBuffer:
     """ Data collector for the copy_from command.
     """
 
 
 class CopyBuffer:
     """ Data collector for the copy_from command.
     """
 
-    def __init__(self):
+    def __init__(self) -> None:
         self.buffer = io.StringIO()
 
 
         self.buffer = io.StringIO()
 
 
-    def __enter__(self):
+    def __enter__(self) -> 'CopyBuffer':
         return self
 
 
         return self
 
 
-    def __exit__(self, exc_type, exc_value, traceback):
+    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
         if self.buffer is not None:
             self.buffer.close()
 
 
         if self.buffer is not None:
             self.buffer.close()
 
 
-    def add(self, *data):
+    def add(self, *data: Any) -> None:
         """ Add another row of data to the copy buffer.
         """
         first = True
         """ Add another row of data to the copy buffer.
         """
         first = True
@@ -98,9 +113,9 @@ class CopyBuffer:
         self.buffer.write('\n')
 
 
         self.buffer.write('\n')
 
 
-    def copy_out(self, cur, table, columns=None):
+    def copy_out(self, cur: Cursor, table: str, columns: Optional[Iterable[str]] = None) -> None:
         """ Copy all collected data into the given table.
         """
         if self.buffer.tell() > 0:
             self.buffer.seek(0)
         """ Copy all collected data into the given table.
         """
         if self.buffer.tell() > 0:
             self.buffer.seek(0)
-            cur.copy_from(self.buffer, table, columns=columns)
+            cur.copy_from(self.buffer, table, columns=columns) # type: ignore[no-untyped-call]