1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of the 'replication' sub-command.
10 from typing import Optional
17 from nominatim.db import status
18 from nominatim.db.connection import connect
19 from nominatim.errors import UsageError
20 from nominatim.clicmd.args import NominatimArgs
22 LOG = logging.getLogger()
24 # Do not repeat documentation of subcommand classes.
25 # pylint: disable=C0111
26 # Using non-top-level imports to make pyosmium optional for replication only.
27 # pylint: disable=C0415
29 class UpdateReplication:
31 Update the database using an online replication service.
33 An OSM replication service is an online service that provides regular
34 updates (OSM diff files) for the planet or update they provide. The OSMF
35 provides the primary replication service for the full planet at
36 https://planet.osm.org/replication/ but there are other providers of
37 extracts of OSM data who provide such a service as well.
39 This sub-command allows to set up such a replication service and download
40 and import updates at regular intervals. You need to call '--init' once to
41 set up the process or whenever you change the replication configuration
42 parameters. Without any arguments, the sub-command will go into a loop and
43 continuously apply updates as they become available. Giving `--once` just
44 downloads and imports the next batch of updates.
47 def add_args(self, parser: argparse.ArgumentParser) -> None:
48 group = parser.add_argument_group('Arguments for initialisation')
49 group.add_argument('--init', action='store_true',
50 help='Initialise the update process')
51 group.add_argument('--no-update-functions', dest='update_functions',
53 help="Do not update the trigger function to "
54 "support differential updates (EXPERT)")
55 group = parser.add_argument_group('Arguments for updates')
56 group.add_argument('--check-for-updates', action='store_true',
57 help='Check if new updates are available and exit')
58 group.add_argument('--once', action='store_true',
59 help="Download and apply updates only once. When "
60 "not set, updates are continuously applied")
61 group.add_argument('--catch-up', action='store_true',
62 help="Download and apply updates until no new "
63 "data is available on the server")
64 group.add_argument('--no-index', action='store_false', dest='do_index',
65 help=("Do not index the new data. Only usable "
66 "together with --once"))
67 group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
68 help='Size of cache to be used by osm2pgsql (in MB)')
69 group = parser.add_argument_group('Download parameters')
70 group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
71 help='Set timeout for file downloads')
74 def _init_replication(self, args: NominatimArgs) -> int:
75 from ..tools import replication, refresh
77 LOG.warning("Initialising replication updates")
78 with connect(args.config.get_libpq_dsn()) as conn:
79 replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
80 if args.update_functions:
81 LOG.warning("Create functions")
82 refresh.create_functions(conn, args.config, True, False)
86 def _check_for_updates(self, args: NominatimArgs) -> int:
87 from ..tools import replication
89 with connect(args.config.get_libpq_dsn()) as conn:
90 return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
93 def _report_update(self, batchdate: dt.datetime,
94 start_import: dt.datetime,
95 start_index: Optional[dt.datetime]) -> None:
96 def round_time(delta: dt.timedelta) -> dt.timedelta:
97 return dt.timedelta(seconds=int(delta.total_seconds()))
99 end = dt.datetime.now(dt.timezone.utc)
100 LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
101 round_time((start_index or end) - start_import),
102 f"Indexing: {round_time(end - start_index)} " if start_index else '',
103 round_time(end - start_import),
104 round_time(end - batchdate))
107 def _compute_update_interval(self, args: NominatimArgs) -> int:
111 update_interval = args.config.get_int('REPLICATION_UPDATE_INTERVAL')
112 # Sanity check to not overwhelm the Geofabrik servers.
113 if 'download.geofabrik.de' in args.config.REPLICATION_URL\
114 and update_interval < 86400:
115 LOG.fatal("Update interval too low for download.geofabrik.de.\n"
116 "Please check install documentation "
117 "(https://nominatim.org/release-docs/latest/admin/Import-and-Update#"
118 "setting-up-the-update-process).")
119 raise UsageError("Invalid replication update interval setting.")
121 return update_interval
124 def _update(self, args: NominatimArgs) -> None:
125 # pylint: disable=too-many-locals
126 from ..tools import replication
127 from ..indexer.indexer import Indexer
128 from ..tokenizer import factory as tokenizer_factory
130 update_interval = self._compute_update_interval(args)
132 params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
133 params.update(base_url=args.config.REPLICATION_URL,
134 update_interval=update_interval,
135 import_file=args.project_dir / 'osmosischange.osc',
136 max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
137 indexed_only=not args.once)
140 if not args.do_index:
141 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
142 raise UsageError("Bad argument '--no-index'.")
143 recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
145 tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
146 indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
149 with connect(args.config.get_libpq_dsn()) as conn:
150 start = dt.datetime.now(dt.timezone.utc)
151 state = replication.update(conn, params)
152 if state is not replication.UpdateState.NO_CHANGES:
153 status.log_status(conn, start, 'import')
154 batchdate, _, _ = status.get_status(conn)
157 if state is not replication.UpdateState.NO_CHANGES and args.do_index:
158 index_start = dt.datetime.now(dt.timezone.utc)
159 indexer.index_full(analyse=False)
161 with connect(args.config.get_libpq_dsn()) as conn:
162 status.set_indexed(conn, True)
163 status.log_status(conn, index_start, 'index')
168 if state is replication.UpdateState.NO_CHANGES and \
169 args.catch_up or update_interval > 40*60:
170 while indexer.has_pending():
171 indexer.index_full(analyse=False)
173 if LOG.isEnabledFor(logging.WARNING):
174 assert batchdate is not None
175 self._report_update(batchdate, start, index_start)
177 if args.once or (args.catch_up and state is replication.UpdateState.NO_CHANGES):
180 if state is replication.UpdateState.NO_CHANGES:
181 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
182 time.sleep(recheck_interval)
185 def run(self, args: NominatimArgs) -> int:
186 socket.setdefaulttimeout(args.socket_timeout)
189 return self._init_replication(args)
191 if args.check_for_updates:
192 return self._check_for_updates(args)