]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/replication.py
Merge pull request #3586 from lonvia/reduce-lookup-calls
[nominatim.git] / src / nominatim_db / tools / replication.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for updating a database from a replication source.
9 """
10 from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
11 from contextlib import contextmanager
12 import datetime as dt
13 from enum import Enum
14 import logging
15 import time
16 import types
17 import urllib.request as urlrequest
18
19 from ..errors import UsageError
20 from ..db import status
21 from ..db.connection import Connection, connect, server_version_tuple
22 from .exec_utils import run_osm2pgsql
23
24 try:
25     from osmium.replication.server import ReplicationServer
26     from osmium import WriteHandler
27     from osmium import version as pyo_version
28     import requests
29 except ModuleNotFoundError as exc:
30     logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
31                                  "To install pyosmium via pip: pip install osmium")
32     raise UsageError("replication tools not available") from exc
33
34 LOG = logging.getLogger()
35
36
37 def init_replication(conn: Connection, base_url: str,
38                      socket_timeout: int = 60) -> None:
39     """ Set up replication for the server at the given base URL.
40     """
41     LOG.info("Using replication source: %s", base_url)
42     date = status.compute_database_date(conn)
43
44     # margin of error to make sure we get all data
45     date -= dt.timedelta(hours=3)
46
47     with _make_replication_server(base_url, socket_timeout) as repl:
48         seq = repl.timestamp_to_sequence(date)
49
50     if seq is None:
51         LOG.fatal("Cannot reach the configured replication service '%s'.\n"
52                   "Does the URL point to a directory containing OSM update data?",
53                   base_url)
54         raise UsageError("Failed to reach replication service")
55
56     status.set_status(conn, date=date, seq=seq)
57
58     LOG.warning("Updates initialised at sequence %s (%s)", seq, date)
59
60
61 def check_for_updates(conn: Connection, base_url: str,
62                       socket_timeout: int = 60) -> int:
63     """ Check if new data is available from the replication service at the
64         given base URL.
65     """
66     _, seq, _ = status.get_status(conn)
67
68     if seq is None:
69         LOG.error("Replication not set up. "
70                   "Please run 'nominatim replication --init' first.")
71         return 254
72
73     with _make_replication_server(base_url, socket_timeout) as repl:
74         state = repl.get_state_info()
75
76     if state is None:
77         LOG.error("Cannot get state for URL %s.", base_url)
78         return 253
79
80     if state.sequence <= seq:
81         LOG.warning("Database is up to date.")
82         return 2
83
84     LOG.warning("New data available (%i => %i).", seq, state.sequence)
85     return 0
86
87
88 class UpdateState(Enum):
89     """ Possible states after an update has run.
90     """
91
92     UP_TO_DATE = 0
93     MORE_PENDING = 2
94     NO_CHANGES = 3
95
96
97 def update(dsn: str, options: MutableMapping[str, Any],
98            socket_timeout: int = 60) -> UpdateState:
99     """ Update database from the next batch of data. Returns the state of
100         updates according to `UpdateState`.
101     """
102     with connect(dsn) as conn:
103         startdate, startseq, indexed = status.get_status(conn)
104         conn.commit()
105
106     if startseq is None:
107         LOG.error("Replication not set up. "
108                   "Please run 'nominatim replication --init' first.")
109         raise UsageError("Replication not set up.")
110
111     assert startdate is not None
112
113     if not indexed and options['indexed_only']:
114         LOG.info("Skipping update. There is data that needs indexing.")
115         return UpdateState.MORE_PENDING
116
117     last_since_update = dt.datetime.now(dt.timezone.utc) - startdate
118     update_interval = dt.timedelta(seconds=options['update_interval'])
119     if last_since_update < update_interval:
120         duration = (update_interval - last_since_update).seconds
121         LOG.warning("Sleeping for %s sec before next update.", duration)
122         time.sleep(duration)
123
124     if options['import_file'].exists():
125         options['import_file'].unlink()
126
127     # Read updates into file.
128     with _make_replication_server(options['base_url'], socket_timeout) as repl:
129         outhandler = WriteHandler(str(options['import_file']))
130         endseq = repl.apply_diffs(outhandler, startseq + 1,
131                                   max_size=options['max_diff_size'] * 1024)
132         outhandler.close()
133
134         if endseq is None:
135             return UpdateState.NO_CHANGES
136
137         with connect(dsn) as conn:
138             run_osm2pgsql_updates(conn, options)
139
140             # Write the current status to the file
141             endstate = repl.get_state_info(endseq)
142             status.set_status(conn, endstate.timestamp if endstate else None,
143                               seq=endseq, indexed=False)
144             conn.commit()
145
146     return UpdateState.UP_TO_DATE
147
148
149 def run_osm2pgsql_updates(conn: Connection, options: MutableMapping[str, Any]) -> None:
150     """ Run osm2pgsql in append mode.
151     """
152     # Remove any stale deletion marks.
153     with conn.cursor() as cur:
154         cur.execute('TRUNCATE place_to_be_deleted')
155     conn.commit()
156
157     # Consume updates with osm2pgsql.
158     options['append'] = True
159     options['disable_jit'] = server_version_tuple(conn) >= (11, 0)
160     run_osm2pgsql(options)
161
162     # Handle deletions
163     with conn.cursor() as cur:
164         cur.execute('SELECT flush_deleted_places()')
165     conn.commit()
166
167
168 def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
169     """ Returns a ReplicationServer in form of a context manager.
170
171         Creates a light wrapper around older versions of pyosmium that did
172         not support the context manager interface.
173     """
174     if hasattr(ReplicationServer, '__enter__'):
175         # Patches the open_url function for pyosmium >= 3.2
176         # where the socket timeout is no longer respected.
177         def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
178             """ Download a resource from the given URL and return a byte sequence
179                 of the content.
180             """
181             headers = {"User-Agent": f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}
182
183             if self.session is not None:
184                 return self.session.get(url.get_full_url(),
185                                         headers=headers, timeout=timeout or None,
186                                         stream=True)
187
188             @contextmanager
189             def _get_url_with_session() -> Iterator[requests.Response]:
190                 with requests.Session() as session:
191                     request = session.get(url.get_full_url(),
192                                           headers=headers, timeout=timeout or None,
193                                           stream=True)
194                     yield request
195
196             return _get_url_with_session()
197
198         repl = ReplicationServer(url)
199         setattr(repl, 'open_url', types.MethodType(patched_open_url, repl))
200
201         return cast(ContextManager[ReplicationServer], repl)
202
203     @contextmanager
204     def get_cm() -> Generator[ReplicationServer, None, None]:
205         yield ReplicationServer(url)
206
207     return get_cm()