1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for updating a database from a replication source.
10 from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
11 from contextlib import contextmanager
17 import urllib.request as urlrequest
19 from ..errors import UsageError
20 from ..db import status
21 from ..db.connection import Connection, connect, server_version_tuple
22 from .exec_utils import run_osm2pgsql
25 from osmium.replication.server import ReplicationServer
26 from osmium import WriteHandler
27 from osmium import version as pyo_version
29 except ModuleNotFoundError as exc:
30 logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
31 "To install pyosmium via pip: pip install osmium")
32 raise UsageError("replication tools not available") from exc
34 LOG = logging.getLogger()
36 def init_replication(conn: Connection, base_url: str,
37 socket_timeout: int = 60) -> None:
38 """ Set up replication for the server at the given base URL.
40 LOG.info("Using replication source: %s", base_url)
41 date = status.compute_database_date(conn)
43 # margin of error to make sure we get all data
44 date -= dt.timedelta(hours=3)
46 with _make_replication_server(base_url, socket_timeout) as repl:
47 seq = repl.timestamp_to_sequence(date)
50 LOG.fatal("Cannot reach the configured replication service '%s'.\n"
51 "Does the URL point to a directory containing OSM update data?",
53 raise UsageError("Failed to reach replication service")
55 status.set_status(conn, date=date, seq=seq)
57 LOG.warning("Updates initialised at sequence %s (%s)", seq, date)
60 def check_for_updates(conn: Connection, base_url: str,
61 socket_timeout: int = 60) -> int:
62 """ Check if new data is available from the replication service at the
65 _, seq, _ = status.get_status(conn)
68 LOG.error("Replication not set up. "
69 "Please run 'nominatim replication --init' first.")
72 with _make_replication_server(base_url, socket_timeout) as repl:
73 state = repl.get_state_info()
76 LOG.error("Cannot get state for URL %s.", base_url)
79 if state.sequence <= seq:
80 LOG.warning("Database is up to date.")
83 LOG.warning("New data available (%i => %i).", seq, state.sequence)
86 class UpdateState(Enum):
87 """ Possible states after an update has run.
95 def update(dsn: str, options: MutableMapping[str, Any],
96 socket_timeout: int = 60) -> UpdateState:
97 """ Update database from the next batch of data. Returns the state of
98 updates according to `UpdateState`.
100 with connect(dsn) as conn:
101 startdate, startseq, indexed = status.get_status(conn)
105 LOG.error("Replication not set up. "
106 "Please run 'nominatim replication --init' first.")
107 raise UsageError("Replication not set up.")
109 assert startdate is not None
111 if not indexed and options['indexed_only']:
112 LOG.info("Skipping update. There is data that needs indexing.")
113 return UpdateState.MORE_PENDING
115 last_since_update = dt.datetime.now(dt.timezone.utc) - startdate
116 update_interval = dt.timedelta(seconds=options['update_interval'])
117 if last_since_update < update_interval:
118 duration = (update_interval - last_since_update).seconds
119 LOG.warning("Sleeping for %s sec before next update.", duration)
122 if options['import_file'].exists():
123 options['import_file'].unlink()
125 # Read updates into file.
126 with _make_replication_server(options['base_url'], socket_timeout) as repl:
127 outhandler = WriteHandler(str(options['import_file']))
128 endseq = repl.apply_diffs(outhandler, startseq + 1,
129 max_size=options['max_diff_size'] * 1024)
133 return UpdateState.NO_CHANGES
135 with connect(dsn) as conn:
136 run_osm2pgsql_updates(conn, options)
138 # Write the current status to the file
139 endstate = repl.get_state_info(endseq)
140 status.set_status(conn, endstate.timestamp if endstate else None,
141 seq=endseq, indexed=False)
144 return UpdateState.UP_TO_DATE
147 def run_osm2pgsql_updates(conn: Connection, options: MutableMapping[str, Any]) -> None:
148 """ Run osm2pgsql in append mode.
150 # Remove any stale deletion marks.
151 with conn.cursor() as cur:
152 cur.execute('TRUNCATE place_to_be_deleted')
155 # Consume updates with osm2pgsql.
156 options['append'] = True
157 options['disable_jit'] = server_version_tuple(conn) >= (11, 0)
158 run_osm2pgsql(options)
161 with conn.cursor() as cur:
162 cur.execute('SELECT flush_deleted_places()')
166 def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
167 """ Returns a ReplicationServer in form of a context manager.
169 Creates a light wrapper around older versions of pyosmium that did
170 not support the context manager interface.
172 if hasattr(ReplicationServer, '__enter__'):
173 # Patches the open_url function for pyosmium >= 3.2
174 # where the socket timeout is no longer respected.
175 def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
176 """ Download a resource from the given URL and return a byte sequence
179 headers = {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}
181 if self.session is not None:
182 return self.session.get(url.get_full_url(),
183 headers=headers, timeout=timeout or None,
187 def _get_url_with_session() -> Iterator[requests.Response]:
188 with requests.Session() as session:
189 request = session.get(url.get_full_url(),
190 headers=headers, timeout=timeout or None,
194 return _get_url_with_session()
196 repl = ReplicationServer(url)
197 setattr(repl, 'open_url', types.MethodType(patched_open_url, repl))
199 return cast(ContextManager[ReplicationServer], repl)
202 def get_cm() -> Generator[ReplicationServer, None, None]:
203 yield ReplicationServer(url)