from ..errors import UsageError
from ..db import status
-from ..db.connection import Connection, connect, server_version_tuple
+from ..db.connection import Connection, connect
from .exec_utils import run_osm2pgsql
try:
LOG = logging.getLogger()
+
def init_replication(conn: Connection, base_url: str,
socket_timeout: int = 60) -> None:
""" Set up replication for the server at the given base URL.
LOG.warning("New data available (%i => %i).", seq, state.sequence)
return 0
+
class UpdateState(Enum):
""" Possible states after an update has run.
"""
# Consume updates with osm2pgsql.
options['append'] = True
- options['disable_jit'] = server_version_tuple(conn) >= (11, 0)
+ options['disable_jit'] = True
run_osm2pgsql(options)
# Handle deletions
""" Download a resource from the given URL and return a byte sequence
of the content.
"""
- headers = {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}
+ headers = {"User-Agent": f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}
if self.session is not None:
return self.session.get(url.get_full_url(),
- headers=headers, timeout=timeout or None,
- stream=True)
+ headers=headers, timeout=timeout or None,
+ stream=True)
@contextmanager
def _get_url_with_session() -> Iterator[requests.Response]: