]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/db/utils.py
Merge pull request #3086 from lonvia/close-connection-on-replication
[nominatim.git] / nominatim / db / utils.py
index e2b63e1dd0e360e528724e5019608a7153e31b5f..9a7b4f164787b8abb03831477fe3b876357e9b25 100644 (file)
@@ -7,19 +7,19 @@
 """
 Helper functions for handling DB accesses.
 """
 """
 Helper functions for handling DB accesses.
 """
-from typing import IO, Optional, Union
+from typing import IO, Optional, Union, Any, Iterable
 import subprocess
 import logging
 import gzip
 import io
 from pathlib import Path
 
 import subprocess
 import logging
 import gzip
 import io
 from pathlib import Path
 
-from nominatim.db.connection import get_pg_env
+from nominatim.db.connection import get_pg_env, Cursor
 from nominatim.errors import UsageError
 
 LOG = logging.getLogger()
 
 from nominatim.errors import UsageError
 
 LOG = logging.getLogger()
 
-def _pipe_to_proc(proc: subprocess.Popen[bytes],
+def _pipe_to_proc(proc: 'subprocess.Popen[bytes]',
                   fdesc: Union[IO[bytes], gzip.GzipFile]) -> int:
     assert proc.stdin is not None
     chunk = fdesc.read(2048)
                   fdesc: Union[IO[bytes], gzip.GzipFile]) -> int:
     assert proc.stdin is not None
     chunk = fdesc.read(2048)
@@ -84,20 +84,20 @@ class CopyBuffer:
     """ Data collector for the copy_from command.
     """
 
     """ Data collector for the copy_from command.
     """
 
-    def __init__(self):
+    def __init__(self) -> None:
         self.buffer = io.StringIO()
 
 
         self.buffer = io.StringIO()
 
 
-    def __enter__(self):
+    def __enter__(self) -> 'CopyBuffer':
         return self
 
 
         return self
 
 
-    def __exit__(self, exc_type, exc_value, traceback):
+    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
         if self.buffer is not None:
             self.buffer.close()
 
 
         if self.buffer is not None:
             self.buffer.close()
 
 
-    def add(self, *data):
+    def add(self, *data: Any) -> None:
         """ Add another row of data to the copy buffer.
         """
         first = True
         """ Add another row of data to the copy buffer.
         """
         first = True
@@ -113,9 +113,9 @@ class CopyBuffer:
         self.buffer.write('\n')
 
 
         self.buffer.write('\n')
 
 
-    def copy_out(self, cur, table, columns=None):
+    def copy_out(self, cur: Cursor, table: str, columns: Optional[Iterable[str]] = None) -> None:
         """ Copy all collected data into the given table.
         """
         if self.buffer.tell() > 0:
             self.buffer.seek(0)
         """ Copy all collected data into the given table.
         """
         if self.buffer.tell() > 0:
             self.buffer.seek(0)
-            cur.copy_from(self.buffer, table, columns=columns)
+            cur.copy_from(self.buffer, table, columns=columns) # type: ignore[no-untyped-call]