"""
Functions for updating a database from a replication source.
"""
-from typing import ContextManager, MutableMapping, Any, Generator, cast
+from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
from contextlib import contextmanager
import datetime as dt
from enum import Enum
import logging
import time
+import types
+import urllib.request as urlrequest
+import requests
from nominatim.db import status
from nominatim.db.connection import Connection
from nominatim.tools.exec_utils import run_osm2pgsql
try:
from osmium.replication.server import ReplicationServer
from osmium import WriteHandler
+ from osmium import version as pyo_version
except ImportError as exc:
logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
"To install pyosmium via pip: pip3 install osmium")
NO_CHANGES = 3
-def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
+def update(conn: Connection, options: MutableMapping[str, Any],
+ socket_timeout: int = 60) -> UpdateState:
""" Update database from the next batch of data. Returns the state of
updates according to `UpdateState`.
"""
options['import_file'].unlink()
# Read updates into file.
- with _make_replication_server(options['base_url']) as repl:
+ with _make_replication_server(options['base_url'], socket_timeout) as repl:
outhandler = WriteHandler(str(options['import_file']))
endseq = repl.apply_diffs(outhandler, startseq + 1,
max_size=options['max_diff_size'] * 1024)
return UpdateState.UP_TO_DATE
-def _make_replication_server(url: str) -> ContextManager[ReplicationServer]:
+def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
""" Returns a ReplicationServer in form of a context manager.
Creates a light wrapper around older versions of pyosmium that did
not support the context manager interface.
"""
if hasattr(ReplicationServer, '__enter__'):
- return cast(ContextManager[ReplicationServer], ReplicationServer(url))
+ # Patches the open_url function for pyosmium >= 3.2
+ # where the socket timeout is no longer respected.
+ def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
+ """ Download a resource from the given URL and return a byte sequence
+ of the content.
+ """
+ get_params = {
+ 'headers': {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"},
+ 'timeout': timeout or None,
+ 'stream': True
+ }
+
+ if self.session is not None:
+ return self.session.get(url.get_full_url(), **get_params)
+
+ @contextmanager
+ def _get_url_with_session() -> Iterator[requests.Response]:
+ with requests.Session() as session:
+ request = session.get(url.get_full_url(), **get_params) # type: ignore
+ yield request
+
+ return _get_url_with_session()
+
+ repl = ReplicationServer(url)
+ repl.open_url = types.MethodType(patched_open_url, repl)
+
+ return cast(ContextManager[ReplicationServer], repl)
@contextmanager
def get_cm() -> Generator[ReplicationServer, None, None]: