]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/replication.py
Merge pull request #3517 from lonvia/improve-custom-formatter
[nominatim.git] / src / nominatim_db / tools / replication.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for updating a database from a replication source.
9 """
10 from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
11 from contextlib import contextmanager
12 import datetime as dt
13 from enum import Enum
14 import logging
15 import time
16 import types
17 import urllib.request as urlrequest
18
19 import requests
20
21 from ..errors import UsageError
22 from ..db import status
23 from ..db.connection import Connection, connect, server_version_tuple
24 from .exec_utils import run_osm2pgsql
25
26 try:
27     from osmium.replication.server import ReplicationServer
28     from osmium import WriteHandler
29     from osmium import version as pyo_version
30 except ImportError as exc:
31     logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
32                                  "To install pyosmium via pip: pip3 install osmium")
33     raise UsageError("replication tools not available") from exc
34
35 LOG = logging.getLogger()
36
37 def init_replication(conn: Connection, base_url: str,
38                      socket_timeout: int = 60) -> None:
39     """ Set up replication for the server at the given base URL.
40     """
41     LOG.info("Using replication source: %s", base_url)
42     date = status.compute_database_date(conn)
43
44     # margin of error to make sure we get all data
45     date -= dt.timedelta(hours=3)
46
47     with _make_replication_server(base_url, socket_timeout) as repl:
48         seq = repl.timestamp_to_sequence(date)
49
50     if seq is None:
51         LOG.fatal("Cannot reach the configured replication service '%s'.\n"
52                   "Does the URL point to a directory containing OSM update data?",
53                   base_url)
54         raise UsageError("Failed to reach replication service")
55
56     status.set_status(conn, date=date, seq=seq)
57
58     LOG.warning("Updates initialised at sequence %s (%s)", seq, date)
59
60
61 def check_for_updates(conn: Connection, base_url: str,
62                       socket_timeout: int = 60) -> int:
63     """ Check if new data is available from the replication service at the
64         given base URL.
65     """
66     _, seq, _ = status.get_status(conn)
67
68     if seq is None:
69         LOG.error("Replication not set up. "
70                   "Please run 'nominatim replication --init' first.")
71         return 254
72
73     with _make_replication_server(base_url, socket_timeout) as repl:
74         state = repl.get_state_info()
75
76     if state is None:
77         LOG.error("Cannot get state for URL %s.", base_url)
78         return 253
79
80     if state.sequence <= seq:
81         LOG.warning("Database is up to date.")
82         return 2
83
84     LOG.warning("New data available (%i => %i).", seq, state.sequence)
85     return 0
86
87 class UpdateState(Enum):
88     """ Possible states after an update has run.
89     """
90
91     UP_TO_DATE = 0
92     MORE_PENDING = 2
93     NO_CHANGES = 3
94
95
96 def update(dsn: str, options: MutableMapping[str, Any],
97            socket_timeout: int = 60) -> UpdateState:
98     """ Update database from the next batch of data. Returns the state of
99         updates according to `UpdateState`.
100     """
101     with connect(dsn) as conn:
102         startdate, startseq, indexed = status.get_status(conn)
103         conn.commit()
104
105     if startseq is None:
106         LOG.error("Replication not set up. "
107                   "Please run 'nominatim replication --init' first.")
108         raise UsageError("Replication not set up.")
109
110     assert startdate is not None
111
112     if not indexed and options['indexed_only']:
113         LOG.info("Skipping update. There is data that needs indexing.")
114         return UpdateState.MORE_PENDING
115
116     last_since_update = dt.datetime.now(dt.timezone.utc) - startdate
117     update_interval = dt.timedelta(seconds=options['update_interval'])
118     if last_since_update < update_interval:
119         duration = (update_interval - last_since_update).seconds
120         LOG.warning("Sleeping for %s sec before next update.", duration)
121         time.sleep(duration)
122
123     if options['import_file'].exists():
124         options['import_file'].unlink()
125
126     # Read updates into file.
127     with _make_replication_server(options['base_url'], socket_timeout) as repl:
128         outhandler = WriteHandler(str(options['import_file']))
129         endseq = repl.apply_diffs(outhandler, startseq + 1,
130                                   max_size=options['max_diff_size'] * 1024)
131         outhandler.close()
132
133         if endseq is None:
134             return UpdateState.NO_CHANGES
135
136         with connect(dsn) as conn:
137             run_osm2pgsql_updates(conn, options)
138
139             # Write the current status to the file
140             endstate = repl.get_state_info(endseq)
141             status.set_status(conn, endstate.timestamp if endstate else None,
142                               seq=endseq, indexed=False)
143             conn.commit()
144
145     return UpdateState.UP_TO_DATE
146
147
148 def run_osm2pgsql_updates(conn: Connection, options: MutableMapping[str, Any]) -> None:
149     """ Run osm2pgsql in append mode.
150     """
151     # Remove any stale deletion marks.
152     with conn.cursor() as cur:
153         cur.execute('TRUNCATE place_to_be_deleted')
154     conn.commit()
155
156     # Consume updates with osm2pgsql.
157     options['append'] = True
158     options['disable_jit'] = server_version_tuple(conn) >= (11, 0)
159     run_osm2pgsql(options)
160
161     # Handle deletions
162     with conn.cursor() as cur:
163         cur.execute('SELECT flush_deleted_places()')
164     conn.commit()
165
166
167 def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
168     """ Returns a ReplicationServer in form of a context manager.
169
170         Creates a light wrapper around older versions of pyosmium that did
171         not support the context manager interface.
172     """
173     if hasattr(ReplicationServer, '__enter__'):
174         # Patches the open_url function for pyosmium >= 3.2
175         # where the socket timeout is no longer respected.
176         def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
177             """ Download a resource from the given URL and return a byte sequence
178                 of the content.
179             """
180             headers = {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}
181
182             if self.session is not None:
183                 return self.session.get(url.get_full_url(),
184                                        headers=headers, timeout=timeout or None,
185                                        stream=True)
186
187             @contextmanager
188             def _get_url_with_session() -> Iterator[requests.Response]:
189                 with requests.Session() as session:
190                     request = session.get(url.get_full_url(),
191                                           headers=headers, timeout=timeout or None,
192                                           stream=True)
193                     yield request
194
195             return _get_url_with_session()
196
197         repl = ReplicationServer(url)
198         setattr(repl, 'open_url', types.MethodType(patched_open_url, repl))
199
200         return cast(ContextManager[ReplicationServer], repl)
201
202     @contextmanager
203     def get_cm() -> Generator[ReplicationServer, None, None]:
204         yield ReplicationServer(url)
205
206     return get_cm()