1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for updating a database from a replication source.
10 from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
11 from contextlib import contextmanager
17 import urllib.request as urlrequest
20 from nominatim.db import status
21 from nominatim.db.connection import Connection
22 from nominatim.tools.exec_utils import run_osm2pgsql
23 from nominatim.errors import UsageError
26 from osmium.replication.server import ReplicationServer
27 from osmium import WriteHandler
28 from osmium import version as pyo_version
29 except ImportError as exc:
30 logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
31 "To install pyosmium via pip: pip3 install osmium")
32 raise UsageError("replication tools not available") from exc
34 LOG = logging.getLogger()
36 def init_replication(conn: Connection, base_url: str,
37 socket_timeout: int = 60) -> None:
38 """ Set up replication for the server at the given base URL.
40 LOG.info("Using replication source: %s", base_url)
41 date = status.compute_database_date(conn)
43 # margin of error to make sure we get all data
44 date -= dt.timedelta(hours=3)
46 with _make_replication_server(base_url, socket_timeout) as repl:
47 seq = repl.timestamp_to_sequence(date)
50 LOG.fatal("Cannot reach the configured replication service '%s'.\n"
51 "Does the URL point to a directory containing OSM update data?",
53 raise UsageError("Failed to reach replication service")
55 status.set_status(conn, date=date, seq=seq)
57 LOG.warning("Updates initialised at sequence %s (%s)", seq, date)
60 def check_for_updates(conn: Connection, base_url: str,
61 socket_timeout: int = 60) -> int:
62 """ Check if new data is available from the replication service at the
65 _, seq, _ = status.get_status(conn)
68 LOG.error("Replication not set up. "
69 "Please run 'nominatim replication --init' first.")
72 with _make_replication_server(base_url, socket_timeout) as repl:
73 state = repl.get_state_info()
76 LOG.error("Cannot get state for URL %s.", base_url)
79 if state.sequence <= seq:
80 LOG.warning("Database is up to date.")
83 LOG.warning("New data available (%i => %i).", seq, state.sequence)
86 class UpdateState(Enum):
87 """ Possible states after an update has run.
95 def update(conn: Connection, options: MutableMapping[str, Any],
96 socket_timeout: int = 60) -> UpdateState:
97 """ Update database from the next batch of data. Returns the state of
98 updates according to `UpdateState`.
100 startdate, startseq, indexed = status.get_status(conn)
103 LOG.error("Replication not set up. "
104 "Please run 'nominatim replication --init' first.")
105 raise UsageError("Replication not set up.")
107 assert startdate is not None
109 if not indexed and options['indexed_only']:
110 LOG.info("Skipping update. There is data that needs indexing.")
111 return UpdateState.MORE_PENDING
113 last_since_update = dt.datetime.now(dt.timezone.utc) - startdate
114 update_interval = dt.timedelta(seconds=options['update_interval'])
115 if last_since_update < update_interval:
116 duration = (update_interval - last_since_update).seconds
117 LOG.warning("Sleeping for %s sec before next update.", duration)
120 if options['import_file'].exists():
121 options['import_file'].unlink()
123 # Read updates into file.
124 with _make_replication_server(options['base_url'], socket_timeout) as repl:
125 outhandler = WriteHandler(str(options['import_file']))
126 endseq = repl.apply_diffs(outhandler, startseq + 1,
127 max_size=options['max_diff_size'] * 1024)
131 return UpdateState.NO_CHANGES
133 run_osm2pgsql_updates(conn, options)
135 # Write the current status to the file
136 endstate = repl.get_state_info(endseq)
137 status.set_status(conn, endstate.timestamp if endstate else None,
138 seq=endseq, indexed=False)
140 return UpdateState.UP_TO_DATE
143 def run_osm2pgsql_updates(conn: Connection, options: MutableMapping[str, Any]) -> None:
144 """ Run osm2pgsql in append mode.
146 # Remove any stale deletion marks.
147 with conn.cursor() as cur:
148 cur.execute('TRUNCATE place_to_be_deleted')
151 # Consume updates with osm2pgsql.
152 options['append'] = True
153 options['disable_jit'] = conn.server_version_tuple() >= (11, 0)
154 run_osm2pgsql(options)
157 with conn.cursor() as cur:
158 cur.execute('SELECT flush_deleted_places()')
162 def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
163 """ Returns a ReplicationServer in form of a context manager.
165 Creates a light wrapper around older versions of pyosmium that did
166 not support the context manager interface.
168 if hasattr(ReplicationServer, '__enter__'):
169 # Patches the open_url function for pyosmium >= 3.2
170 # where the socket timeout is no longer respected.
171 def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
172 """ Download a resource from the given URL and return a byte sequence
175 headers = {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}
177 if self.session is not None:
178 return self.session.get(url.get_full_url(),
179 headers=headers, timeout=timeout or None,
183 def _get_url_with_session() -> Iterator[requests.Response]:
184 with requests.Session() as session:
185 request = session.get(url.get_full_url(),
186 headers=headers, timeout=timeout or None,
190 return _get_url_with_session()
192 repl = ReplicationServer(url)
193 setattr(repl, 'open_url', types.MethodType(patched_open_url, repl))
195 return cast(ContextManager[ReplicationServer], repl)
198 def get_cm() -> Generator[ReplicationServer, None, None]:
199 yield ReplicationServer(url)