]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/clicmd/replication.py
fix style issue found by flake8
[nominatim.git] / src / nominatim_db / clicmd / replication.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of the 'replication' sub-command.
9 """
10 from typing import Optional
11 import argparse
12 import datetime as dt
13 import logging
14 import socket
15 import time
16 import asyncio
17
18 from ..db import status
19 from ..db.connection import connect
20 from ..errors import UsageError
21 from .args import NominatimArgs
22
23 LOG = logging.getLogger()
24
25
26 class UpdateReplication:
27     """\
28     Update the database using an online replication service.
29
30     An OSM replication service is an online service that provides regular
31     updates (OSM diff files) for the planet or update they provide. The OSMF
32     provides the primary replication service for the full planet at
33     https://planet.osm.org/replication/ but there are other providers of
34     extracts of OSM data who provide such a service as well.
35
36     This sub-command allows to set up such a replication service and download
37     and import updates at regular intervals. You need to call '--init' once to
38     set up the process or whenever you change the replication configuration
39     parameters. Without any arguments, the sub-command will go into a loop and
40     continuously apply updates as they become available. Giving `--once` just
41     downloads and imports the next batch of updates.
42     """
43
44     def add_args(self, parser: argparse.ArgumentParser) -> None:
45         group = parser.add_argument_group('Arguments for initialisation')
46         group.add_argument('--init', action='store_true',
47                            help='Initialise the update process')
48         group.add_argument('--no-update-functions', dest='update_functions',
49                            action='store_false',
50                            help="Do not update the trigger function to "
51                                 "support differential updates (EXPERT)")
52         group = parser.add_argument_group('Arguments for updates')
53         group.add_argument('--check-for-updates', action='store_true',
54                            help='Check if new updates are available and exit')
55         group.add_argument('--once', action='store_true',
56                            help="Download and apply updates only once. When "
57                                 "not set, updates are continuously applied")
58         group.add_argument('--catch-up', action='store_true',
59                            help="Download and apply updates until no new "
60                                 "data is available on the server")
61         group.add_argument('--no-index', action='store_false', dest='do_index',
62                            help=("Do not index the new data. Only usable "
63                                  "together with --once"))
64         group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
65                            help='Size of cache to be used by osm2pgsql (in MB)')
66         group = parser.add_argument_group('Download parameters')
67         group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
68                            help='Set timeout for file downloads')
69
70     def _init_replication(self, args: NominatimArgs) -> int:
71         from ..tools import replication, refresh
72
73         LOG.warning("Initialising replication updates")
74         with connect(args.config.get_libpq_dsn()) as conn:
75             replication.init_replication(conn, base_url=args.config.REPLICATION_URL,
76                                          socket_timeout=args.socket_timeout)
77             if args.update_functions:
78                 LOG.warning("Create functions")
79                 refresh.create_functions(conn, args.config, True, False)
80         return 0
81
82     def _check_for_updates(self, args: NominatimArgs) -> int:
83         from ..tools import replication
84
85         with connect(args.config.get_libpq_dsn()) as conn:
86             return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL,
87                                                  socket_timeout=args.socket_timeout)
88
89     def _report_update(self, batchdate: dt.datetime,
90                        start_import: dt.datetime,
91                        start_index: Optional[dt.datetime]) -> None:
92         def round_time(delta: dt.timedelta) -> dt.timedelta:
93             return dt.timedelta(seconds=int(delta.total_seconds()))
94
95         end = dt.datetime.now(dt.timezone.utc)
96         LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
97                     round_time((start_index or end) - start_import),
98                     f"Indexing: {round_time(end - start_index)} " if start_index else '',
99                     round_time(end - start_import),
100                     round_time(end - batchdate))
101
102     def _compute_update_interval(self, args: NominatimArgs) -> int:
103         if args.catch_up:
104             return 0
105
106         update_interval = args.config.get_int('REPLICATION_UPDATE_INTERVAL')
107         # Sanity check to not overwhelm the Geofabrik servers.
108         if 'download.geofabrik.de' in args.config.REPLICATION_URL\
109            and update_interval < 86400:
110             LOG.fatal("Update interval too low for download.geofabrik.de.\n"
111                       "Please check install documentation "
112                       "(https://nominatim.org/release-docs/latest/admin/Update/#"
113                       "setting-up-the-update-process).")
114             raise UsageError("Invalid replication update interval setting.")
115
116         return update_interval
117
118     async def _update(self, args: NominatimArgs) -> None:
119         # pylint: disable=too-many-locals
120         from ..tools import replication
121         from ..indexer.indexer import Indexer
122         from ..tokenizer import factory as tokenizer_factory
123
124         update_interval = self._compute_update_interval(args)
125
126         params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
127         params.update(base_url=args.config.REPLICATION_URL,
128                       update_interval=update_interval,
129                       import_file=args.project_dir / 'osmosischange.osc',
130                       max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
131                       indexed_only=not args.once)
132
133         if not args.once:
134             if not args.do_index:
135                 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
136                 raise UsageError("Bad argument '--no-index'.")
137         recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
138
139         tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
140         indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
141
142         dsn = args.config.get_libpq_dsn()
143
144         while True:
145             start = dt.datetime.now(dt.timezone.utc)
146             state = replication.update(dsn, params, socket_timeout=args.socket_timeout)
147
148             with connect(dsn) as conn:
149                 if state is not replication.UpdateState.NO_CHANGES:
150                     status.log_status(conn, start, 'import')
151                 batchdate, _, _ = status.get_status(conn)
152                 conn.commit()
153
154             if state is not replication.UpdateState.NO_CHANGES and args.do_index:
155                 index_start = dt.datetime.now(dt.timezone.utc)
156                 await indexer.index_full(analyse=False)
157
158                 with connect(dsn) as conn:
159                     status.set_indexed(conn, True)
160                     status.log_status(conn, index_start, 'index')
161                     conn.commit()
162             else:
163                 index_start = None
164
165             if state is replication.UpdateState.NO_CHANGES and \
166                args.catch_up or update_interval > 40*60:
167                 await indexer.index_full(analyse=False)
168
169             if LOG.isEnabledFor(logging.WARNING):
170                 assert batchdate is not None
171                 self._report_update(batchdate, start, index_start)
172
173             if args.once or (args.catch_up and state is replication.UpdateState.NO_CHANGES):
174                 break
175
176             if state is replication.UpdateState.NO_CHANGES:
177                 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
178                 time.sleep(recheck_interval)
179
180     def run(self, args: NominatimArgs) -> int:
181         socket.setdefaulttimeout(args.socket_timeout)
182
183         if args.init:
184             return self._init_replication(args)
185
186         if args.check_for_updates:
187             return self._check_for_updates(args)
188
189         asyncio.run(self._update(args))
190         return 0