]> git.openstreetmap.org Git - nominatim.git/commitdiff
fix timeout use for replication timeout
authorSarah Hoffmann <lonvia@denofr.de>
Tue, 8 Nov 2022 20:45:36 +0000 (21:45 +0100)
committerSarah Hoffmann <lonvia@denofr.de>
Wed, 9 Nov 2022 08:12:37 +0000 (09:12 +0100)
The timeout parameter is no longer taken into account since
pyosmium switched to the requests library. This adds the parameter
back.

nominatim/clicmd/replication.py
nominatim/tools/replication.py

index 2d6396a1d13e248d85846e8ce167cb25b151a2f0..235817de6e54d3f1e5ff4b1cca68285832c78c4e 100644 (file)
@@ -148,7 +148,7 @@ class UpdateReplication:
         while True:
             with connect(args.config.get_libpq_dsn()) as conn:
                 start = dt.datetime.now(dt.timezone.utc)
         while True:
             with connect(args.config.get_libpq_dsn()) as conn:
                 start = dt.datetime.now(dt.timezone.utc)
-                state = replication.update(conn, params)
+                state = replication.update(conn, params, socket_timeout=args.socket_timeout)
                 if state is not replication.UpdateState.NO_CHANGES:
                     status.log_status(conn, start, 'import')
                 batchdate, _, _ = status.get_status(conn)
                 if state is not replication.UpdateState.NO_CHANGES:
                     status.log_status(conn, start, 'import')
                 batchdate, _, _ = status.get_status(conn)
index db706bf67563d795b863da9b0d025412b8554c0f..1c46c50c4522cda6b397bae3d65d4a8b6305d880 100644 (file)
@@ -7,13 +7,16 @@
 """
 Functions for updating a database from a replication source.
 """
 """
 Functions for updating a database from a replication source.
 """
-from typing import ContextManager, MutableMapping, Any, Generator, cast
+from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
 from contextlib import contextmanager
 import datetime as dt
 from enum import Enum
 import logging
 import time
 from contextlib import contextmanager
 import datetime as dt
 from enum import Enum
 import logging
 import time
+import types
+import urllib.request as urlrequest
 
 
+import requests
 from nominatim.db import status
 from nominatim.db.connection import Connection
 from nominatim.tools.exec_utils import run_osm2pgsql
 from nominatim.db import status
 from nominatim.db.connection import Connection
 from nominatim.tools.exec_utils import run_osm2pgsql
@@ -22,6 +25,7 @@ from nominatim.errors import UsageError
 try:
     from osmium.replication.server import ReplicationServer
     from osmium import WriteHandler
 try:
     from osmium.replication.server import ReplicationServer
     from osmium import WriteHandler
+    from osmium import version as pyo_version
 except ImportError as exc:
     logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
                                  "To install pyosmium via pip: pip3 install osmium")
 except ImportError as exc:
     logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
                                  "To install pyosmium via pip: pip3 install osmium")
@@ -86,7 +90,8 @@ class UpdateState(Enum):
     NO_CHANGES = 3
 
 
     NO_CHANGES = 3
 
 
-def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
+def update(conn: Connection, options: MutableMapping[str, Any],
+           socket_timeout: int = 60) -> UpdateState:
     """ Update database from the next batch of data. Returns the state of
         updates according to `UpdateState`.
     """
     """ Update database from the next batch of data. Returns the state of
         updates according to `UpdateState`.
     """
@@ -114,7 +119,7 @@ def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
         options['import_file'].unlink()
 
     # Read updates into file.
         options['import_file'].unlink()
 
     # Read updates into file.
-    with _make_replication_server(options['base_url']) as repl:
+    with _make_replication_server(options['base_url'], socket_timeout) as repl:
         outhandler = WriteHandler(str(options['import_file']))
         endseq = repl.apply_diffs(outhandler, startseq + 1,
                                   max_size=options['max_diff_size'] * 1024)
         outhandler = WriteHandler(str(options['import_file']))
         endseq = repl.apply_diffs(outhandler, startseq + 1,
                                   max_size=options['max_diff_size'] * 1024)
@@ -136,14 +141,40 @@ def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
     return UpdateState.UP_TO_DATE
 
 
     return UpdateState.UP_TO_DATE
 
 
-def _make_replication_server(url: str) -> ContextManager[ReplicationServer]:
+def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
     """ Returns a ReplicationServer in form of a context manager.
 
         Creates a light wrapper around older versions of pyosmium that did
         not support the context manager interface.
     """
     if hasattr(ReplicationServer, '__enter__'):
     """ Returns a ReplicationServer in form of a context manager.
 
         Creates a light wrapper around older versions of pyosmium that did
         not support the context manager interface.
     """
     if hasattr(ReplicationServer, '__enter__'):
-        return cast(ContextManager[ReplicationServer], ReplicationServer(url))
+        # Patches the open_url function for pyosmium >= 3.2
+        # where the socket timeout is no longer respected.
+        def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
+            """ Download a resource from the given URL and return a byte sequence
+                of the content.
+            """
+            get_params = {
+                'headers': {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"},
+                'timeout': timeout or None,
+                'stream': True
+            }
+
+            if self.session is not None:
+                return self.session.get(url.get_full_url(), **get_params)
+
+            @contextmanager
+            def _get_url_with_session() -> Iterator[requests.Response]:
+                with requests.Session() as session:
+                    request = session.get(url.get_full_url(), **get_params) # type: ignore
+                    yield request
+
+            return _get_url_with_session()
+
+        repl = ReplicationServer(url)
+        repl.open_url = types.MethodType(patched_open_url, repl)
+
+        return cast(ContextManager[ReplicationServer], repl)
 
     @contextmanager
     def get_cm() -> Generator[ReplicationServer, None, None]:
 
     @contextmanager
     def get_cm() -> Generator[ReplicationServer, None, None]: