1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of the 'replication' sub-command.
10 from typing import Optional
18 from ..db import status
19 from ..db.connection import connect
20 from ..errors import UsageError
21 from .args import NominatimArgs
23 LOG = logging.getLogger()
26 class UpdateReplication:
28 Update the database using an online replication service.
30 An OSM replication service is an online service that provides regular
31 updates (OSM diff files) for the planet or update they provide. The OSMF
32 provides the primary replication service for the full planet at
33 https://planet.osm.org/replication/ but there are other providers of
34 extracts of OSM data who provide such a service as well.
36 This sub-command allows to set up such a replication service and download
37 and import updates at regular intervals. You need to call '--init' once to
38 set up the process or whenever you change the replication configuration
39 parameters. Without any arguments, the sub-command will go into a loop and
40 continuously apply updates as they become available. Giving `--once` just
41 downloads and imports the next batch of updates.
44 def add_args(self, parser: argparse.ArgumentParser) -> None:
45 group = parser.add_argument_group('Arguments for initialisation')
46 group.add_argument('--init', action='store_true',
47 help='Initialise the update process')
48 group.add_argument('--no-update-functions', dest='update_functions',
50 help="Do not update the trigger function to "
51 "support differential updates (EXPERT)")
52 group = parser.add_argument_group('Arguments for updates')
53 group.add_argument('--check-for-updates', action='store_true',
54 help='Check if new updates are available and exit')
55 group.add_argument('--once', action='store_true',
56 help="Download and apply updates only once. When "
57 "not set, updates are continuously applied")
58 group.add_argument('--catch-up', action='store_true',
59 help="Download and apply updates until no new "
60 "data is available on the server")
61 group.add_argument('--no-index', action='store_false', dest='do_index',
62 help=("Do not index the new data. Only usable "
63 "together with --once"))
64 group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
65 help='Size of cache to be used by osm2pgsql (in MB)')
66 group = parser.add_argument_group('Download parameters')
67 group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
68 help='Set timeout for file downloads')
70 def _init_replication(self, args: NominatimArgs) -> int:
71 from ..tools import replication, refresh
73 LOG.warning("Initialising replication updates")
74 with connect(args.config.get_libpq_dsn()) as conn:
75 replication.init_replication(conn, base_url=args.config.REPLICATION_URL,
76 socket_timeout=args.socket_timeout)
77 if args.update_functions:
78 LOG.warning("Create functions")
79 refresh.create_functions(conn, args.config, True, False)
82 def _check_for_updates(self, args: NominatimArgs) -> int:
83 from ..tools import replication
85 with connect(args.config.get_libpq_dsn()) as conn:
86 return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL,
87 socket_timeout=args.socket_timeout)
89 def _report_update(self, batchdate: dt.datetime,
90 start_import: dt.datetime,
91 start_index: Optional[dt.datetime]) -> None:
92 def round_time(delta: dt.timedelta) -> dt.timedelta:
93 return dt.timedelta(seconds=int(delta.total_seconds()))
95 end = dt.datetime.now(dt.timezone.utc)
96 LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
97 round_time((start_index or end) - start_import),
98 f"Indexing: {round_time(end - start_index)} " if start_index else '',
99 round_time(end - start_import),
100 round_time(end - batchdate))
102 def _compute_update_interval(self, args: NominatimArgs) -> int:
106 update_interval = args.config.get_int('REPLICATION_UPDATE_INTERVAL')
107 # Sanity check to not overwhelm the Geofabrik servers.
108 if 'download.geofabrik.de' in args.config.REPLICATION_URL\
109 and update_interval < 86400:
110 LOG.fatal("Update interval too low for download.geofabrik.de.\n"
111 "Please check install documentation "
112 "(https://nominatim.org/release-docs/latest/admin/Update/#"
113 "setting-up-the-update-process).")
114 raise UsageError("Invalid replication update interval setting.")
116 return update_interval
118 async def _update(self, args: NominatimArgs) -> None:
119 # pylint: disable=too-many-locals
120 from ..tools import replication
121 from ..indexer.indexer import Indexer
122 from ..tokenizer import factory as tokenizer_factory
124 update_interval = self._compute_update_interval(args)
126 params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
127 params.update(base_url=args.config.REPLICATION_URL,
128 update_interval=update_interval,
129 import_file=args.project_dir / 'osmosischange.osc',
130 max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
131 indexed_only=not args.once)
134 if not args.do_index:
135 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
136 raise UsageError("Bad argument '--no-index'.")
137 recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
139 tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
140 indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
142 dsn = args.config.get_libpq_dsn()
145 start = dt.datetime.now(dt.timezone.utc)
146 state = replication.update(dsn, params, socket_timeout=args.socket_timeout)
148 with connect(dsn) as conn:
149 if state is not replication.UpdateState.NO_CHANGES:
150 status.log_status(conn, start, 'import')
151 batchdate, _, _ = status.get_status(conn)
154 if state is not replication.UpdateState.NO_CHANGES and args.do_index:
155 index_start = dt.datetime.now(dt.timezone.utc)
156 await indexer.index_full(analyse=False)
158 with connect(dsn) as conn:
159 status.set_indexed(conn, True)
160 status.log_status(conn, index_start, 'index')
165 if state is replication.UpdateState.NO_CHANGES and \
166 args.catch_up or update_interval > 40*60:
167 await indexer.index_full(analyse=False)
169 if LOG.isEnabledFor(logging.WARNING):
170 assert batchdate is not None
171 self._report_update(batchdate, start, index_start)
173 if args.once or (args.catch_up and state is replication.UpdateState.NO_CHANGES):
176 if state is replication.UpdateState.NO_CHANGES:
177 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
178 time.sleep(recheck_interval)
180 def run(self, args: NominatimArgs) -> int:
181 socket.setdefaulttimeout(args.socket_timeout)
184 return self._init_replication(args)
186 if args.check_for_updates:
187 return self._check_for_updates(args)
189 asyncio.run(self._update(args))