]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/clicmd/replication.py
CLI: more useful error messages on JSON formatting errors
[nominatim.git] / src / nominatim_db / clicmd / replication.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of the 'replication' sub-command.
9 """
10 from typing import Optional
11 import argparse
12 import datetime as dt
13 import logging
14 import socket
15 import time
16 import asyncio
17
18 from ..db import status
19 from ..db.connection import connect
20 from ..errors import UsageError
21 from .args import NominatimArgs
22
23 LOG = logging.getLogger()
24
25 # Do not repeat documentation of subcommand classes.
26 # pylint: disable=C0111
27 # Using non-top-level imports to make pyosmium optional for replication only.
28 # pylint: disable=C0415
29
30 class UpdateReplication:
31     """\
32     Update the database using an online replication service.
33
34     An OSM replication service is an online service that provides regular
35     updates (OSM diff files) for the planet or update they provide. The OSMF
36     provides the primary replication service for the full planet at
37     https://planet.osm.org/replication/ but there are other providers of
38     extracts of OSM data who provide such a service as well.
39
40     This sub-command allows to set up such a replication service and download
41     and import updates at regular intervals. You need to call '--init' once to
42     set up the process or whenever you change the replication configuration
43     parameters. Without any arguments, the sub-command will go into a loop and
44     continuously apply updates as they become available. Giving `--once` just
45     downloads and imports the next batch of updates.
46     """
47
48     def add_args(self, parser: argparse.ArgumentParser) -> None:
49         group = parser.add_argument_group('Arguments for initialisation')
50         group.add_argument('--init', action='store_true',
51                            help='Initialise the update process')
52         group.add_argument('--no-update-functions', dest='update_functions',
53                            action='store_false',
54                            help="Do not update the trigger function to "
55                                 "support differential updates (EXPERT)")
56         group = parser.add_argument_group('Arguments for updates')
57         group.add_argument('--check-for-updates', action='store_true',
58                            help='Check if new updates are available and exit')
59         group.add_argument('--once', action='store_true',
60                            help="Download and apply updates only once. When "
61                                 "not set, updates are continuously applied")
62         group.add_argument('--catch-up', action='store_true',
63                            help="Download and apply updates until no new "
64                                 "data is available on the server")
65         group.add_argument('--no-index', action='store_false', dest='do_index',
66                            help=("Do not index the new data. Only usable "
67                                  "together with --once"))
68         group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
69                            help='Size of cache to be used by osm2pgsql (in MB)')
70         group = parser.add_argument_group('Download parameters')
71         group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
72                            help='Set timeout for file downloads')
73
74
75     def _init_replication(self, args: NominatimArgs) -> int:
76         from ..tools import replication, refresh
77
78         LOG.warning("Initialising replication updates")
79         with connect(args.config.get_libpq_dsn()) as conn:
80             replication.init_replication(conn, base_url=args.config.REPLICATION_URL,
81                                          socket_timeout=args.socket_timeout)
82             if args.update_functions:
83                 LOG.warning("Create functions")
84                 refresh.create_functions(conn, args.config, True, False)
85         return 0
86
87
88     def _check_for_updates(self, args: NominatimArgs) -> int:
89         from ..tools import replication
90
91         with connect(args.config.get_libpq_dsn()) as conn:
92             return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL,
93                                                  socket_timeout=args.socket_timeout)
94
95
96     def _report_update(self, batchdate: dt.datetime,
97                        start_import: dt.datetime,
98                        start_index: Optional[dt.datetime]) -> None:
99         def round_time(delta: dt.timedelta) -> dt.timedelta:
100             return dt.timedelta(seconds=int(delta.total_seconds()))
101
102         end = dt.datetime.now(dt.timezone.utc)
103         LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
104                     round_time((start_index or end) - start_import),
105                     f"Indexing: {round_time(end - start_index)} " if start_index else '',
106                     round_time(end - start_import),
107                     round_time(end - batchdate))
108
109
110     def _compute_update_interval(self, args: NominatimArgs) -> int:
111         if args.catch_up:
112             return 0
113
114         update_interval = args.config.get_int('REPLICATION_UPDATE_INTERVAL')
115         # Sanity check to not overwhelm the Geofabrik servers.
116         if 'download.geofabrik.de' in args.config.REPLICATION_URL\
117            and update_interval < 86400:
118             LOG.fatal("Update interval too low for download.geofabrik.de.\n"
119                       "Please check install documentation "
120                       "(https://nominatim.org/release-docs/latest/admin/Update/#"
121                       "setting-up-the-update-process).")
122             raise UsageError("Invalid replication update interval setting.")
123
124         return update_interval
125
126
127     async def _update(self, args: NominatimArgs) -> None:
128         # pylint: disable=too-many-locals
129         from ..tools import replication
130         from ..indexer.indexer import Indexer
131         from ..tokenizer import factory as tokenizer_factory
132
133         update_interval = self._compute_update_interval(args)
134
135         params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
136         params.update(base_url=args.config.REPLICATION_URL,
137                       update_interval=update_interval,
138                       import_file=args.project_dir / 'osmosischange.osc',
139                       max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
140                       indexed_only=not args.once)
141
142         if not args.once:
143             if not args.do_index:
144                 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
145                 raise UsageError("Bad argument '--no-index'.")
146         recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
147
148         tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
149         indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
150
151         dsn = args.config.get_libpq_dsn()
152
153         while True:
154             start = dt.datetime.now(dt.timezone.utc)
155             state = replication.update(dsn, params, socket_timeout=args.socket_timeout)
156
157             with connect(dsn) as conn:
158                 if state is not replication.UpdateState.NO_CHANGES:
159                     status.log_status(conn, start, 'import')
160                 batchdate, _, _ = status.get_status(conn)
161                 conn.commit()
162
163             if state is not replication.UpdateState.NO_CHANGES and args.do_index:
164                 index_start = dt.datetime.now(dt.timezone.utc)
165                 await indexer.index_full(analyse=False)
166
167                 with connect(dsn) as conn:
168                     status.set_indexed(conn, True)
169                     status.log_status(conn, index_start, 'index')
170                     conn.commit()
171             else:
172                 index_start = None
173
174             if state is replication.UpdateState.NO_CHANGES and \
175                args.catch_up or update_interval > 40*60:
176                 await indexer.index_full(analyse=False)
177
178             if LOG.isEnabledFor(logging.WARNING):
179                 assert batchdate is not None
180                 self._report_update(batchdate, start, index_start)
181
182             if args.once or (args.catch_up and state is replication.UpdateState.NO_CHANGES):
183                 break
184
185             if state is replication.UpdateState.NO_CHANGES:
186                 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
187                 time.sleep(recheck_interval)
188
189
190     def run(self, args: NominatimArgs) -> int:
191         socket.setdefaulttimeout(args.socket_timeout)
192
193         if args.init:
194             return self._init_replication(args)
195
196         if args.check_for_updates:
197             return self._check_for_updates(args)
198
199         asyncio.run(self._update(args))
200         return 0