1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for updating a database from a replication source.
10 from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
11 from contextlib import contextmanager
17 import urllib.request as urlrequest
21 from ..errors import UsageError
22 from ..db import status
23 from ..db.connection import Connection, connect
24 from .exec_utils import run_osm2pgsql
27 from osmium.replication.server import ReplicationServer
28 from osmium import WriteHandler
29 from osmium import version as pyo_version
30 except ImportError as exc:
31 logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
32 "To install pyosmium via pip: pip3 install osmium")
33 raise UsageError("replication tools not available") from exc
35 LOG = logging.getLogger()
37 def init_replication(conn: Connection, base_url: str,
38 socket_timeout: int = 60) -> None:
39 """ Set up replication for the server at the given base URL.
41 LOG.info("Using replication source: %s", base_url)
42 date = status.compute_database_date(conn)
44 # margin of error to make sure we get all data
45 date -= dt.timedelta(hours=3)
47 with _make_replication_server(base_url, socket_timeout) as repl:
48 seq = repl.timestamp_to_sequence(date)
51 LOG.fatal("Cannot reach the configured replication service '%s'.\n"
52 "Does the URL point to a directory containing OSM update data?",
54 raise UsageError("Failed to reach replication service")
56 status.set_status(conn, date=date, seq=seq)
58 LOG.warning("Updates initialised at sequence %s (%s)", seq, date)
61 def check_for_updates(conn: Connection, base_url: str,
62 socket_timeout: int = 60) -> int:
63 """ Check if new data is available from the replication service at the
66 _, seq, _ = status.get_status(conn)
69 LOG.error("Replication not set up. "
70 "Please run 'nominatim replication --init' first.")
73 with _make_replication_server(base_url, socket_timeout) as repl:
74 state = repl.get_state_info()
77 LOG.error("Cannot get state for URL %s.", base_url)
80 if state.sequence <= seq:
81 LOG.warning("Database is up to date.")
84 LOG.warning("New data available (%i => %i).", seq, state.sequence)
87 class UpdateState(Enum):
88 """ Possible states after an update has run.
96 def update(dsn: str, options: MutableMapping[str, Any],
97 socket_timeout: int = 60) -> UpdateState:
98 """ Update database from the next batch of data. Returns the state of
99 updates according to `UpdateState`.
101 with connect(dsn) as conn:
102 startdate, startseq, indexed = status.get_status(conn)
106 LOG.error("Replication not set up. "
107 "Please run 'nominatim replication --init' first.")
108 raise UsageError("Replication not set up.")
110 assert startdate is not None
112 if not indexed and options['indexed_only']:
113 LOG.info("Skipping update. There is data that needs indexing.")
114 return UpdateState.MORE_PENDING
116 last_since_update = dt.datetime.now(dt.timezone.utc) - startdate
117 update_interval = dt.timedelta(seconds=options['update_interval'])
118 if last_since_update < update_interval:
119 duration = (update_interval - last_since_update).seconds
120 LOG.warning("Sleeping for %s sec before next update.", duration)
123 if options['import_file'].exists():
124 options['import_file'].unlink()
126 # Read updates into file.
127 with _make_replication_server(options['base_url'], socket_timeout) as repl:
128 outhandler = WriteHandler(str(options['import_file']))
129 endseq = repl.apply_diffs(outhandler, startseq + 1,
130 max_size=options['max_diff_size'] * 1024)
134 return UpdateState.NO_CHANGES
136 with connect(dsn) as conn:
137 run_osm2pgsql_updates(conn, options)
139 # Write the current status to the file
140 endstate = repl.get_state_info(endseq)
141 status.set_status(conn, endstate.timestamp if endstate else None,
142 seq=endseq, indexed=False)
145 return UpdateState.UP_TO_DATE
148 def run_osm2pgsql_updates(conn: Connection, options: MutableMapping[str, Any]) -> None:
149 """ Run osm2pgsql in append mode.
151 # Remove any stale deletion marks.
152 with conn.cursor() as cur:
153 cur.execute('TRUNCATE place_to_be_deleted')
156 # Consume updates with osm2pgsql.
157 options['append'] = True
158 options['disable_jit'] = conn.server_version_tuple() >= (11, 0)
159 run_osm2pgsql(options)
162 with conn.cursor() as cur:
163 cur.execute('SELECT flush_deleted_places()')
167 def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
168 """ Returns a ReplicationServer in form of a context manager.
170 Creates a light wrapper around older versions of pyosmium that did
171 not support the context manager interface.
173 if hasattr(ReplicationServer, '__enter__'):
174 # Patches the open_url function for pyosmium >= 3.2
175 # where the socket timeout is no longer respected.
176 def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
177 """ Download a resource from the given URL and return a byte sequence
180 headers = {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}
182 if self.session is not None:
183 return self.session.get(url.get_full_url(),
184 headers=headers, timeout=timeout or None,
188 def _get_url_with_session() -> Iterator[requests.Response]:
189 with requests.Session() as session:
190 request = session.get(url.get_full_url(),
191 headers=headers, timeout=timeout or None,
195 return _get_url_with_session()
197 repl = ReplicationServer(url)
198 setattr(repl, 'open_url', types.MethodType(patched_open_url, repl))
200 return cast(ContextManager[ReplicationServer], repl)
203 def get_cm() -> Generator[ReplicationServer, None, None]:
204 yield ReplicationServer(url)