]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/replication.py
db706bf67563d795b863da9b0d025412b8554c0f
[nominatim.git] / nominatim / tools / replication.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for updating a database from a replication source.
9 """
10 from typing import ContextManager, MutableMapping, Any, Generator, cast
11 from contextlib import contextmanager
12 import datetime as dt
13 from enum import Enum
14 import logging
15 import time
16
17 from nominatim.db import status
18 from nominatim.db.connection import Connection
19 from nominatim.tools.exec_utils import run_osm2pgsql
20 from nominatim.errors import UsageError
21
22 try:
23     from osmium.replication.server import ReplicationServer
24     from osmium import WriteHandler
25 except ImportError as exc:
26     logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
27                                  "To install pyosmium via pip: pip3 install osmium")
28     raise UsageError("replication tools not available") from exc
29
30 LOG = logging.getLogger()
31
32 def init_replication(conn: Connection, base_url: str) -> None:
33     """ Set up replication for the server at the given base URL.
34     """
35     LOG.info("Using replication source: %s", base_url)
36     date = status.compute_database_date(conn)
37
38     # margin of error to make sure we get all data
39     date -= dt.timedelta(hours=3)
40
41     repl = ReplicationServer(base_url)
42
43     seq = repl.timestamp_to_sequence(date)
44
45     if seq is None:
46         LOG.fatal("Cannot reach the configured replication service '%s'.\n"
47                   "Does the URL point to a directory containing OSM update data?",
48                   base_url)
49         raise UsageError("Failed to reach replication service")
50
51     status.set_status(conn, date=date, seq=seq)
52
53     LOG.warning("Updates initialised at sequence %s (%s)", seq, date)
54
55
56 def check_for_updates(conn: Connection, base_url: str) -> int:
57     """ Check if new data is available from the replication service at the
58         given base URL.
59     """
60     _, seq, _ = status.get_status(conn)
61
62     if seq is None:
63         LOG.error("Replication not set up. "
64                   "Please run 'nominatim replication --init' first.")
65         return 254
66
67     state = ReplicationServer(base_url).get_state_info()
68
69     if state is None:
70         LOG.error("Cannot get state for URL %s.", base_url)
71         return 253
72
73     if state.sequence <= seq:
74         LOG.warning("Database is up to date.")
75         return 2
76
77     LOG.warning("New data available (%i => %i).", seq, state.sequence)
78     return 0
79
80 class UpdateState(Enum):
81     """ Possible states after an update has run.
82     """
83
84     UP_TO_DATE = 0
85     MORE_PENDING = 2
86     NO_CHANGES = 3
87
88
89 def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
90     """ Update database from the next batch of data. Returns the state of
91         updates according to `UpdateState`.
92     """
93     startdate, startseq, indexed = status.get_status(conn)
94
95     if startseq is None:
96         LOG.error("Replication not set up. "
97                   "Please run 'nominatim replication --init' first.")
98         raise UsageError("Replication not set up.")
99
100     assert startdate is not None
101
102     if not indexed and options['indexed_only']:
103         LOG.info("Skipping update. There is data that needs indexing.")
104         return UpdateState.MORE_PENDING
105
106     last_since_update = dt.datetime.now(dt.timezone.utc) - startdate
107     update_interval = dt.timedelta(seconds=options['update_interval'])
108     if last_since_update < update_interval:
109         duration = (update_interval - last_since_update).seconds
110         LOG.warning("Sleeping for %s sec before next update.", duration)
111         time.sleep(duration)
112
113     if options['import_file'].exists():
114         options['import_file'].unlink()
115
116     # Read updates into file.
117     with _make_replication_server(options['base_url']) as repl:
118         outhandler = WriteHandler(str(options['import_file']))
119         endseq = repl.apply_diffs(outhandler, startseq + 1,
120                                   max_size=options['max_diff_size'] * 1024)
121         outhandler.close()
122
123         if endseq is None:
124             return UpdateState.NO_CHANGES
125
126         # Consume updates with osm2pgsql.
127         options['append'] = True
128         options['disable_jit'] = conn.server_version_tuple() >= (11, 0)
129         run_osm2pgsql(options)
130
131         # Write the current status to the file
132         endstate = repl.get_state_info(endseq)
133         status.set_status(conn, endstate.timestamp if endstate else None,
134                           seq=endseq, indexed=False)
135
136     return UpdateState.UP_TO_DATE
137
138
139 def _make_replication_server(url: str) -> ContextManager[ReplicationServer]:
140     """ Returns a ReplicationServer in form of a context manager.
141
142         Creates a light wrapper around older versions of pyosmium that did
143         not support the context manager interface.
144     """
145     if hasattr(ReplicationServer, '__enter__'):
146         return cast(ContextManager[ReplicationServer], ReplicationServer(url))
147
148     @contextmanager
149     def get_cm() -> Generator[ReplicationServer, None, None]:
150         yield ReplicationServer(url)
151
152     return get_cm()