1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of the 'replication' sub-command.
15 from nominatim.db import status
16 from nominatim.db.connection import connect
17 from nominatim.errors import UsageError
19 LOG = logging.getLogger()
21 # Do not repeat documentation of subcommand classes.
22 # pylint: disable=C0111
23 # Using non-top-level imports to make pyosmium optional for replication only.
24 # pylint: disable=E0012,C0415
26 class UpdateReplication:
28 Update the database using an online replication service.
30 An OSM replication service is an online service that provides regular
31 updates (OSM diff files) for the planet or update they provide. The OSMF
32 provides the primary replication service for the full planet at
33 https://planet.osm.org/replication/ but there are other providers of
34 extracts of OSM data who provide such a service as well.
36 This sub-command allows to set up such a replication service and download
37 and import updates at regular intervals. You need to call '--init' once to
38 set up the process or whenever you change the replication configuration
39 parameters. Without any arguments, the sub-command will go into a loop and
40 continuously apply updates as they become available. Giving `--once` just
41 downloads and imports the next batch of updates.
46 group = parser.add_argument_group('Arguments for initialisation')
47 group.add_argument('--init', action='store_true',
48 help='Initialise the update process')
49 group.add_argument('--no-update-functions', dest='update_functions',
51 help="Do not update the trigger function to "
52 "support differential updates (EXPERT)")
53 group = parser.add_argument_group('Arguments for updates')
54 group.add_argument('--check-for-updates', action='store_true',
55 help='Check if new updates are available and exit')
56 group.add_argument('--once', action='store_true',
57 help="Download and apply updates only once. When "
58 "not set, updates are continuously applied")
59 group.add_argument('--catch-up', action='store_true',
60 help="Download and apply updates until no new "
61 "data is available on the server")
62 group.add_argument('--no-index', action='store_false', dest='do_index',
63 help=("Do not index the new data. Only usable "
64 "together with --once"))
65 group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
66 help='Size of cache to be used by osm2pgsql (in MB)')
67 group = parser.add_argument_group('Download parameters')
68 group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
69 help='Set timeout for file downloads')
72 def _init_replication(args):
73 from ..tools import replication, refresh
75 LOG.warning("Initialising replication updates")
76 with connect(args.config.get_libpq_dsn()) as conn:
77 replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
78 if args.update_functions:
79 LOG.warning("Create functions")
80 refresh.create_functions(conn, args.config, True, False)
85 def _check_for_updates(args):
86 from ..tools import replication
88 with connect(args.config.get_libpq_dsn()) as conn:
89 return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
92 def _report_update(batchdate, start_import, start_index):
93 def round_time(delta):
94 return dt.timedelta(seconds=int(delta.total_seconds()))
96 end = dt.datetime.now(dt.timezone.utc)
97 LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
98 round_time((start_index or end) - start_import),
99 "Indexing: {} ".format(round_time(end - start_index))
100 if start_index else '',
101 round_time(end - start_import),
102 round_time(end - batchdate))
106 def _compute_update_interval(args):
110 update_interval = args.config.get_int('REPLICATION_UPDATE_INTERVAL')
111 # Sanity check to not overwhelm the Geofabrik servers.
112 if 'download.geofabrik.de' in args.config.REPLICATION_URL\
113 and update_interval < 86400:
114 LOG.fatal("Update interval too low for download.geofabrik.de.\n"
115 "Please check install documentation "
116 "(https://nominatim.org/release-docs/latest/admin/Import-and-Update#"
117 "setting-up-the-update-process).")
118 raise UsageError("Invalid replication update interval setting.")
120 return update_interval
125 from ..tools import replication
126 from ..indexer.indexer import Indexer
127 from ..tokenizer import factory as tokenizer_factory
129 update_interval = UpdateReplication._compute_update_interval(args)
131 params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
132 params.update(base_url=args.config.REPLICATION_URL,
133 update_interval=update_interval,
134 import_file=args.project_dir / 'osmosischange.osc',
135 max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
136 indexed_only=not args.once)
139 if not args.do_index:
140 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
141 raise UsageError("Bad argument '--no-index'.")
142 recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
144 tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
145 indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
148 with connect(args.config.get_libpq_dsn()) as conn:
149 start = dt.datetime.now(dt.timezone.utc)
150 state = replication.update(conn, params)
151 if state is not replication.UpdateState.NO_CHANGES:
152 status.log_status(conn, start, 'import')
153 batchdate, _, _ = status.get_status(conn)
156 if state is not replication.UpdateState.NO_CHANGES and args.do_index:
157 index_start = dt.datetime.now(dt.timezone.utc)
158 indexer.index_full(analyse=False)
160 with connect(args.config.get_libpq_dsn()) as conn:
161 status.set_indexed(conn, True)
162 status.log_status(conn, index_start, 'index')
167 if state is replication.UpdateState.NO_CHANGES and \
168 args.catch_up or update_interval > 40*60:
169 while indexer.has_pending():
170 indexer.index_full(analyse=False)
172 if LOG.isEnabledFor(logging.WARNING):
173 UpdateReplication._report_update(batchdate, start, index_start)
175 if args.once or (args.catch_up and state is replication.UpdateState.NO_CHANGES):
178 if state is replication.UpdateState.NO_CHANGES:
179 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
180 time.sleep(recheck_interval)
185 socket.setdefaulttimeout(args.socket_timeout)
188 return UpdateReplication._init_replication(args)
190 if args.check_for_updates:
191 return UpdateReplication._check_for_updates(args)
193 UpdateReplication._update(args)