]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/replication.py
1c46c50c4522cda6b397bae3d65d4a8b6305d880
[nominatim.git] / nominatim / tools / replication.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for updating a database from a replication source.
9 """
10 from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
11 from contextlib import contextmanager
12 import datetime as dt
13 from enum import Enum
14 import logging
15 import time
16 import types
17 import urllib.request as urlrequest
18
19 import requests
20 from nominatim.db import status
21 from nominatim.db.connection import Connection
22 from nominatim.tools.exec_utils import run_osm2pgsql
23 from nominatim.errors import UsageError
24
25 try:
26     from osmium.replication.server import ReplicationServer
27     from osmium import WriteHandler
28     from osmium import version as pyo_version
29 except ImportError as exc:
30     logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
31                                  "To install pyosmium via pip: pip3 install osmium")
32     raise UsageError("replication tools not available") from exc
33
34 LOG = logging.getLogger()
35
36 def init_replication(conn: Connection, base_url: str) -> None:
37     """ Set up replication for the server at the given base URL.
38     """
39     LOG.info("Using replication source: %s", base_url)
40     date = status.compute_database_date(conn)
41
42     # margin of error to make sure we get all data
43     date -= dt.timedelta(hours=3)
44
45     repl = ReplicationServer(base_url)
46
47     seq = repl.timestamp_to_sequence(date)
48
49     if seq is None:
50         LOG.fatal("Cannot reach the configured replication service '%s'.\n"
51                   "Does the URL point to a directory containing OSM update data?",
52                   base_url)
53         raise UsageError("Failed to reach replication service")
54
55     status.set_status(conn, date=date, seq=seq)
56
57     LOG.warning("Updates initialised at sequence %s (%s)", seq, date)
58
59
60 def check_for_updates(conn: Connection, base_url: str) -> int:
61     """ Check if new data is available from the replication service at the
62         given base URL.
63     """
64     _, seq, _ = status.get_status(conn)
65
66     if seq is None:
67         LOG.error("Replication not set up. "
68                   "Please run 'nominatim replication --init' first.")
69         return 254
70
71     state = ReplicationServer(base_url).get_state_info()
72
73     if state is None:
74         LOG.error("Cannot get state for URL %s.", base_url)
75         return 253
76
77     if state.sequence <= seq:
78         LOG.warning("Database is up to date.")
79         return 2
80
81     LOG.warning("New data available (%i => %i).", seq, state.sequence)
82     return 0
83
84 class UpdateState(Enum):
85     """ Possible states after an update has run.
86     """
87
88     UP_TO_DATE = 0
89     MORE_PENDING = 2
90     NO_CHANGES = 3
91
92
93 def update(conn: Connection, options: MutableMapping[str, Any],
94            socket_timeout: int = 60) -> UpdateState:
95     """ Update database from the next batch of data. Returns the state of
96         updates according to `UpdateState`.
97     """
98     startdate, startseq, indexed = status.get_status(conn)
99
100     if startseq is None:
101         LOG.error("Replication not set up. "
102                   "Please run 'nominatim replication --init' first.")
103         raise UsageError("Replication not set up.")
104
105     assert startdate is not None
106
107     if not indexed and options['indexed_only']:
108         LOG.info("Skipping update. There is data that needs indexing.")
109         return UpdateState.MORE_PENDING
110
111     last_since_update = dt.datetime.now(dt.timezone.utc) - startdate
112     update_interval = dt.timedelta(seconds=options['update_interval'])
113     if last_since_update < update_interval:
114         duration = (update_interval - last_since_update).seconds
115         LOG.warning("Sleeping for %s sec before next update.", duration)
116         time.sleep(duration)
117
118     if options['import_file'].exists():
119         options['import_file'].unlink()
120
121     # Read updates into file.
122     with _make_replication_server(options['base_url'], socket_timeout) as repl:
123         outhandler = WriteHandler(str(options['import_file']))
124         endseq = repl.apply_diffs(outhandler, startseq + 1,
125                                   max_size=options['max_diff_size'] * 1024)
126         outhandler.close()
127
128         if endseq is None:
129             return UpdateState.NO_CHANGES
130
131         # Consume updates with osm2pgsql.
132         options['append'] = True
133         options['disable_jit'] = conn.server_version_tuple() >= (11, 0)
134         run_osm2pgsql(options)
135
136         # Write the current status to the file
137         endstate = repl.get_state_info(endseq)
138         status.set_status(conn, endstate.timestamp if endstate else None,
139                           seq=endseq, indexed=False)
140
141     return UpdateState.UP_TO_DATE
142
143
144 def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
145     """ Returns a ReplicationServer in form of a context manager.
146
147         Creates a light wrapper around older versions of pyosmium that did
148         not support the context manager interface.
149     """
150     if hasattr(ReplicationServer, '__enter__'):
151         # Patches the open_url function for pyosmium >= 3.2
152         # where the socket timeout is no longer respected.
153         def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
154             """ Download a resource from the given URL and return a byte sequence
155                 of the content.
156             """
157             get_params = {
158                 'headers': {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"},
159                 'timeout': timeout or None,
160                 'stream': True
161             }
162
163             if self.session is not None:
164                 return self.session.get(url.get_full_url(), **get_params)
165
166             @contextmanager
167             def _get_url_with_session() -> Iterator[requests.Response]:
168                 with requests.Session() as session:
169                     request = session.get(url.get_full_url(), **get_params) # type: ignore
170                     yield request
171
172             return _get_url_with_session()
173
174         repl = ReplicationServer(url)
175         repl.open_url = types.MethodType(patched_open_url, repl)
176
177         return cast(ContextManager[ReplicationServer], repl)
178
179     @contextmanager
180     def get_cm() -> Generator[ReplicationServer, None, None]:
181         yield ReplicationServer(url)
182
183     return get_cm()