1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of the 'replication' sub-command.
10 from typing import Optional
18 from ..db import status
19 from ..db.connection import connect
20 from ..errors import UsageError
21 from .args import NominatimArgs
23 LOG = logging.getLogger()
25 # Do not repeat documentation of subcommand classes.
26 # pylint: disable=C0111
27 # Using non-top-level imports to make pyosmium optional for replication only.
28 # pylint: disable=C0415
30 class UpdateReplication:
32 Update the database using an online replication service.
34 An OSM replication service is an online service that provides regular
35 updates (OSM diff files) for the planet or update they provide. The OSMF
36 provides the primary replication service for the full planet at
37 https://planet.osm.org/replication/ but there are other providers of
38 extracts of OSM data who provide such a service as well.
40 This sub-command allows to set up such a replication service and download
41 and import updates at regular intervals. You need to call '--init' once to
42 set up the process or whenever you change the replication configuration
43 parameters. Without any arguments, the sub-command will go into a loop and
44 continuously apply updates as they become available. Giving `--once` just
45 downloads and imports the next batch of updates.
48 def add_args(self, parser: argparse.ArgumentParser) -> None:
49 group = parser.add_argument_group('Arguments for initialisation')
50 group.add_argument('--init', action='store_true',
51 help='Initialise the update process')
52 group.add_argument('--no-update-functions', dest='update_functions',
54 help="Do not update the trigger function to "
55 "support differential updates (EXPERT)")
56 group = parser.add_argument_group('Arguments for updates')
57 group.add_argument('--check-for-updates', action='store_true',
58 help='Check if new updates are available and exit')
59 group.add_argument('--once', action='store_true',
60 help="Download and apply updates only once. When "
61 "not set, updates are continuously applied")
62 group.add_argument('--catch-up', action='store_true',
63 help="Download and apply updates until no new "
64 "data is available on the server")
65 group.add_argument('--no-index', action='store_false', dest='do_index',
66 help=("Do not index the new data. Only usable "
67 "together with --once"))
68 group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
69 help='Size of cache to be used by osm2pgsql (in MB)')
70 group = parser.add_argument_group('Download parameters')
71 group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
72 help='Set timeout for file downloads')
75 def _init_replication(self, args: NominatimArgs) -> int:
76 from ..tools import replication, refresh
78 LOG.warning("Initialising replication updates")
79 with connect(args.config.get_libpq_dsn()) as conn:
80 replication.init_replication(conn, base_url=args.config.REPLICATION_URL,
81 socket_timeout=args.socket_timeout)
82 if args.update_functions:
83 LOG.warning("Create functions")
84 refresh.create_functions(conn, args.config, True, False)
88 def _check_for_updates(self, args: NominatimArgs) -> int:
89 from ..tools import replication
91 with connect(args.config.get_libpq_dsn()) as conn:
92 return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL,
93 socket_timeout=args.socket_timeout)
96 def _report_update(self, batchdate: dt.datetime,
97 start_import: dt.datetime,
98 start_index: Optional[dt.datetime]) -> None:
99 def round_time(delta: dt.timedelta) -> dt.timedelta:
100 return dt.timedelta(seconds=int(delta.total_seconds()))
102 end = dt.datetime.now(dt.timezone.utc)
103 LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
104 round_time((start_index or end) - start_import),
105 f"Indexing: {round_time(end - start_index)} " if start_index else '',
106 round_time(end - start_import),
107 round_time(end - batchdate))
110 def _compute_update_interval(self, args: NominatimArgs) -> int:
114 update_interval = args.config.get_int('REPLICATION_UPDATE_INTERVAL')
115 # Sanity check to not overwhelm the Geofabrik servers.
116 if 'download.geofabrik.de' in args.config.REPLICATION_URL\
117 and update_interval < 86400:
118 LOG.fatal("Update interval too low for download.geofabrik.de.\n"
119 "Please check install documentation "
120 "(https://nominatim.org/release-docs/latest/admin/Update/#"
121 "setting-up-the-update-process).")
122 raise UsageError("Invalid replication update interval setting.")
124 return update_interval
127 async def _update(self, args: NominatimArgs) -> None:
128 # pylint: disable=too-many-locals
129 from ..tools import replication
130 from ..indexer.indexer import Indexer
131 from ..tokenizer import factory as tokenizer_factory
133 update_interval = self._compute_update_interval(args)
135 params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
136 params.update(base_url=args.config.REPLICATION_URL,
137 update_interval=update_interval,
138 import_file=args.project_dir / 'osmosischange.osc',
139 max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
140 indexed_only=not args.once)
143 if not args.do_index:
144 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
145 raise UsageError("Bad argument '--no-index'.")
146 recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
148 tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
149 indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
151 dsn = args.config.get_libpq_dsn()
154 start = dt.datetime.now(dt.timezone.utc)
155 state = replication.update(dsn, params, socket_timeout=args.socket_timeout)
157 with connect(dsn) as conn:
158 if state is not replication.UpdateState.NO_CHANGES:
159 status.log_status(conn, start, 'import')
160 batchdate, _, _ = status.get_status(conn)
163 if state is not replication.UpdateState.NO_CHANGES and args.do_index:
164 index_start = dt.datetime.now(dt.timezone.utc)
165 await indexer.index_full(analyse=False)
167 with connect(dsn) as conn:
168 status.set_indexed(conn, True)
169 status.log_status(conn, index_start, 'index')
174 if state is replication.UpdateState.NO_CHANGES and \
175 args.catch_up or update_interval > 40*60:
176 await indexer.index_full(analyse=False)
178 if LOG.isEnabledFor(logging.WARNING):
179 assert batchdate is not None
180 self._report_update(batchdate, start, index_start)
182 if args.once or (args.catch_up and state is replication.UpdateState.NO_CHANGES):
185 if state is replication.UpdateState.NO_CHANGES:
186 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
187 time.sleep(recheck_interval)
190 def run(self, args: NominatimArgs) -> int:
191 socket.setdefaulttimeout(args.socket_timeout)
194 return self._init_replication(args)
196 if args.check_for_updates:
197 return self._check_for_updates(args)
199 asyncio.run(self._update(args))