]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/replication.py
factor out SQL for filtering by location
[nominatim.git] / nominatim / tools / replication.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for updating a database from a replication source.
9 """
10 from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
11 from contextlib import contextmanager
12 import datetime as dt
13 from enum import Enum
14 import logging
15 import time
16 import types
17 import urllib.request as urlrequest
18
19 import requests
20 from nominatim.db import status
21 from nominatim.db.connection import Connection, connect
22 from nominatim.tools.exec_utils import run_osm2pgsql
23 from nominatim.errors import UsageError
24
25 try:
26     from osmium.replication.server import ReplicationServer
27     from osmium import WriteHandler
28     from osmium import version as pyo_version
29 except ImportError as exc:
30     logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
31                                  "To install pyosmium via pip: pip3 install osmium")
32     raise UsageError("replication tools not available") from exc
33
34 LOG = logging.getLogger()
35
36 def init_replication(conn: Connection, base_url: str,
37                      socket_timeout: int = 60) -> None:
38     """ Set up replication for the server at the given base URL.
39     """
40     LOG.info("Using replication source: %s", base_url)
41     date = status.compute_database_date(conn)
42
43     # margin of error to make sure we get all data
44     date -= dt.timedelta(hours=3)
45
46     with _make_replication_server(base_url, socket_timeout) as repl:
47         seq = repl.timestamp_to_sequence(date)
48
49     if seq is None:
50         LOG.fatal("Cannot reach the configured replication service '%s'.\n"
51                   "Does the URL point to a directory containing OSM update data?",
52                   base_url)
53         raise UsageError("Failed to reach replication service")
54
55     status.set_status(conn, date=date, seq=seq)
56
57     LOG.warning("Updates initialised at sequence %s (%s)", seq, date)
58
59
60 def check_for_updates(conn: Connection, base_url: str,
61                       socket_timeout: int = 60) -> int:
62     """ Check if new data is available from the replication service at the
63         given base URL.
64     """
65     _, seq, _ = status.get_status(conn)
66
67     if seq is None:
68         LOG.error("Replication not set up. "
69                   "Please run 'nominatim replication --init' first.")
70         return 254
71
72     with _make_replication_server(base_url, socket_timeout) as repl:
73         state = repl.get_state_info()
74
75     if state is None:
76         LOG.error("Cannot get state for URL %s.", base_url)
77         return 253
78
79     if state.sequence <= seq:
80         LOG.warning("Database is up to date.")
81         return 2
82
83     LOG.warning("New data available (%i => %i).", seq, state.sequence)
84     return 0
85
86 class UpdateState(Enum):
87     """ Possible states after an update has run.
88     """
89
90     UP_TO_DATE = 0
91     MORE_PENDING = 2
92     NO_CHANGES = 3
93
94
95 def update(dsn: str, options: MutableMapping[str, Any],
96            socket_timeout: int = 60) -> UpdateState:
97     """ Update database from the next batch of data. Returns the state of
98         updates according to `UpdateState`.
99     """
100     with connect(dsn) as conn:
101         startdate, startseq, indexed = status.get_status(conn)
102         conn.commit()
103
104     if startseq is None:
105         LOG.error("Replication not set up. "
106                   "Please run 'nominatim replication --init' first.")
107         raise UsageError("Replication not set up.")
108
109     assert startdate is not None
110
111     if not indexed and options['indexed_only']:
112         LOG.info("Skipping update. There is data that needs indexing.")
113         return UpdateState.MORE_PENDING
114
115     last_since_update = dt.datetime.now(dt.timezone.utc) - startdate
116     update_interval = dt.timedelta(seconds=options['update_interval'])
117     if last_since_update < update_interval:
118         duration = (update_interval - last_since_update).seconds
119         LOG.warning("Sleeping for %s sec before next update.", duration)
120         time.sleep(duration)
121
122     if options['import_file'].exists():
123         options['import_file'].unlink()
124
125     # Read updates into file.
126     with _make_replication_server(options['base_url'], socket_timeout) as repl:
127         outhandler = WriteHandler(str(options['import_file']))
128         endseq = repl.apply_diffs(outhandler, startseq + 1,
129                                   max_size=options['max_diff_size'] * 1024)
130         outhandler.close()
131
132         if endseq is None:
133             return UpdateState.NO_CHANGES
134
135         with connect(dsn) as conn:
136             run_osm2pgsql_updates(conn, options)
137
138             # Write the current status to the file
139             endstate = repl.get_state_info(endseq)
140             status.set_status(conn, endstate.timestamp if endstate else None,
141                               seq=endseq, indexed=False)
142             conn.commit()
143
144     return UpdateState.UP_TO_DATE
145
146
147 def run_osm2pgsql_updates(conn: Connection, options: MutableMapping[str, Any]) -> None:
148     """ Run osm2pgsql in append mode.
149     """
150     # Remove any stale deletion marks.
151     with conn.cursor() as cur:
152         cur.execute('TRUNCATE place_to_be_deleted')
153     conn.commit()
154
155     # Consume updates with osm2pgsql.
156     options['append'] = True
157     options['disable_jit'] = conn.server_version_tuple() >= (11, 0)
158     run_osm2pgsql(options)
159
160     # Handle deletions
161     with conn.cursor() as cur:
162         cur.execute('SELECT flush_deleted_places()')
163     conn.commit()
164
165
166 def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
167     """ Returns a ReplicationServer in form of a context manager.
168
169         Creates a light wrapper around older versions of pyosmium that did
170         not support the context manager interface.
171     """
172     if hasattr(ReplicationServer, '__enter__'):
173         # Patches the open_url function for pyosmium >= 3.2
174         # where the socket timeout is no longer respected.
175         def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
176             """ Download a resource from the given URL and return a byte sequence
177                 of the content.
178             """
179             headers = {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}
180
181             if self.session is not None:
182                 return self.session.get(url.get_full_url(),
183                                        headers=headers, timeout=timeout or None,
184                                        stream=True)
185
186             @contextmanager
187             def _get_url_with_session() -> Iterator[requests.Response]:
188                 with requests.Session() as session:
189                     request = session.get(url.get_full_url(),
190                                           headers=headers, timeout=timeout or None,
191                                           stream=True)
192                     yield request
193
194             return _get_url_with_session()
195
196         repl = ReplicationServer(url)
197         setattr(repl, 'open_url', types.MethodType(patched_open_url, repl))
198
199         return cast(ContextManager[ReplicationServer], repl)
200
201     @contextmanager
202     def get_cm() -> Generator[ReplicationServer, None, None]:
203         yield ReplicationServer(url)
204
205     return get_cm()