]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/clicmd/replication.py
move API dependent functions out of args structure
[nominatim.git] / src / nominatim_db / clicmd / replication.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of the 'replication' sub-command.
9 """
10 from typing import Optional
11 import argparse
12 import datetime as dt
13 import logging
14 import socket
15 import time
16
17 from nominatim_core.db import status
18 from nominatim_core.db.connection import connect
19 from nominatim_core.errors import UsageError
20 from .args import NominatimArgs
21
22 LOG = logging.getLogger()
23
24 # Do not repeat documentation of subcommand classes.
25 # pylint: disable=C0111
26 # Using non-top-level imports to make pyosmium optional for replication only.
27 # pylint: disable=C0415
28
29 class UpdateReplication:
30     """\
31     Update the database using an online replication service.
32
33     An OSM replication service is an online service that provides regular
34     updates (OSM diff files) for the planet or update they provide. The OSMF
35     provides the primary replication service for the full planet at
36     https://planet.osm.org/replication/ but there are other providers of
37     extracts of OSM data who provide such a service as well.
38
39     This sub-command allows to set up such a replication service and download
40     and import updates at regular intervals. You need to call '--init' once to
41     set up the process or whenever you change the replication configuration
42     parameters. Without any arguments, the sub-command will go into a loop and
43     continuously apply updates as they become available. Giving `--once` just
44     downloads and imports the next batch of updates.
45     """
46
47     def add_args(self, parser: argparse.ArgumentParser) -> None:
48         group = parser.add_argument_group('Arguments for initialisation')
49         group.add_argument('--init', action='store_true',
50                            help='Initialise the update process')
51         group.add_argument('--no-update-functions', dest='update_functions',
52                            action='store_false',
53                            help="Do not update the trigger function to "
54                                 "support differential updates (EXPERT)")
55         group = parser.add_argument_group('Arguments for updates')
56         group.add_argument('--check-for-updates', action='store_true',
57                            help='Check if new updates are available and exit')
58         group.add_argument('--once', action='store_true',
59                            help="Download and apply updates only once. When "
60                                 "not set, updates are continuously applied")
61         group.add_argument('--catch-up', action='store_true',
62                            help="Download and apply updates until no new "
63                                 "data is available on the server")
64         group.add_argument('--no-index', action='store_false', dest='do_index',
65                            help=("Do not index the new data. Only usable "
66                                  "together with --once"))
67         group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
68                            help='Size of cache to be used by osm2pgsql (in MB)')
69         group = parser.add_argument_group('Download parameters')
70         group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
71                            help='Set timeout for file downloads')
72
73
74     def _init_replication(self, args: NominatimArgs) -> int:
75         from ..tools import replication, refresh
76
77         LOG.warning("Initialising replication updates")
78         with connect(args.config.get_libpq_dsn()) as conn:
79             replication.init_replication(conn, base_url=args.config.REPLICATION_URL,
80                                          socket_timeout=args.socket_timeout)
81             if args.update_functions:
82                 LOG.warning("Create functions")
83                 refresh.create_functions(conn, args.config, True, False)
84         return 0
85
86
87     def _check_for_updates(self, args: NominatimArgs) -> int:
88         from ..tools import replication
89
90         with connect(args.config.get_libpq_dsn()) as conn:
91             return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL,
92                                                  socket_timeout=args.socket_timeout)
93
94
95     def _report_update(self, batchdate: dt.datetime,
96                        start_import: dt.datetime,
97                        start_index: Optional[dt.datetime]) -> None:
98         def round_time(delta: dt.timedelta) -> dt.timedelta:
99             return dt.timedelta(seconds=int(delta.total_seconds()))
100
101         end = dt.datetime.now(dt.timezone.utc)
102         LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
103                     round_time((start_index or end) - start_import),
104                     f"Indexing: {round_time(end - start_index)} " if start_index else '',
105                     round_time(end - start_import),
106                     round_time(end - batchdate))
107
108
109     def _compute_update_interval(self, args: NominatimArgs) -> int:
110         if args.catch_up:
111             return 0
112
113         update_interval = args.config.get_int('REPLICATION_UPDATE_INTERVAL')
114         # Sanity check to not overwhelm the Geofabrik servers.
115         if 'download.geofabrik.de' in args.config.REPLICATION_URL\
116            and update_interval < 86400:
117             LOG.fatal("Update interval too low for download.geofabrik.de.\n"
118                       "Please check install documentation "
119                       "(https://nominatim.org/release-docs/latest/admin/Import-and-Update#"
120                       "setting-up-the-update-process).")
121             raise UsageError("Invalid replication update interval setting.")
122
123         return update_interval
124
125
126     def _update(self, args: NominatimArgs) -> None:
127         # pylint: disable=too-many-locals
128         from ..tools import replication
129         from ..indexer.indexer import Indexer
130         from ..tokenizer import factory as tokenizer_factory
131
132         update_interval = self._compute_update_interval(args)
133
134         params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
135         params.update(base_url=args.config.REPLICATION_URL,
136                       update_interval=update_interval,
137                       import_file=args.project_dir / 'osmosischange.osc',
138                       max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
139                       indexed_only=not args.once)
140
141         if not args.once:
142             if not args.do_index:
143                 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
144                 raise UsageError("Bad argument '--no-index'.")
145         recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
146
147         tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
148         indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
149
150         dsn = args.config.get_libpq_dsn()
151
152         while True:
153             start = dt.datetime.now(dt.timezone.utc)
154             state = replication.update(dsn, params, socket_timeout=args.socket_timeout)
155
156             with connect(dsn) as conn:
157                 if state is not replication.UpdateState.NO_CHANGES:
158                     status.log_status(conn, start, 'import')
159                 batchdate, _, _ = status.get_status(conn)
160                 conn.commit()
161
162             if state is not replication.UpdateState.NO_CHANGES and args.do_index:
163                 index_start = dt.datetime.now(dt.timezone.utc)
164                 indexer.index_full(analyse=False)
165
166                 with connect(dsn) as conn:
167                     status.set_indexed(conn, True)
168                     status.log_status(conn, index_start, 'index')
169                     conn.commit()
170             else:
171                 index_start = None
172
173             if state is replication.UpdateState.NO_CHANGES and \
174                args.catch_up or update_interval > 40*60:
175                 while indexer.has_pending():
176                     indexer.index_full(analyse=False)
177
178             if LOG.isEnabledFor(logging.WARNING):
179                 assert batchdate is not None
180                 self._report_update(batchdate, start, index_start)
181
182             if args.once or (args.catch_up and state is replication.UpdateState.NO_CHANGES):
183                 break
184
185             if state is replication.UpdateState.NO_CHANGES:
186                 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
187                 time.sleep(recheck_interval)
188
189
190     def run(self, args: NominatimArgs) -> int:
191         socket.setdefaulttimeout(args.socket_timeout)
192
193         if args.init:
194             return self._init_replication(args)
195
196         if args.check_for_updates:
197             return self._check_for_updates(args)
198
199         self._update(args)
200         return 0