]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/clicmd/replication.py
fix linting issues
[nominatim.git] / nominatim / clicmd / replication.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of the 'replication' sub-command.
9 """
10 import datetime as dt
11 import logging
12 import socket
13 import time
14
15 from nominatim.db import status
16 from nominatim.db.connection import connect
17 from nominatim.errors import UsageError
18
19 LOG = logging.getLogger()
20
21 # Do not repeat documentation of subcommand classes.
22 # pylint: disable=C0111
23 # Using non-top-level imports to make pyosmium optional for replication only.
24 # pylint: disable=E0012,C0415
25
26 class UpdateReplication:
27     """\
28     Update the database using an online replication service.
29
30     An OSM replication service is an online service that provides regular
31     updates (OSM diff files) for the planet or update they provide. The OSMF
32     provides the primary replication service for the full planet at
33     https://planet.osm.org/replication/ but there are other providers of
34     extracts of OSM data who provide such a service as well.
35
36     This sub-command allows to set up such a replication service and download
37     and import updates at regular intervals. You need to call '--init' once to
38     set up the process or whenever you change the replication configuration
39     parameters. Without any arguments, the sub-command will go into a loop and
40     continuously apply updates as they become available. Giving `--once` just
41     downloads and imports the next batch of updates.
42     """
43
44     @staticmethod
45     def add_args(parser):
46         group = parser.add_argument_group('Arguments for initialisation')
47         group.add_argument('--init', action='store_true',
48                            help='Initialise the update process')
49         group.add_argument('--no-update-functions', dest='update_functions',
50                            action='store_false',
51                            help="Do not update the trigger function to "
52                                 "support differential updates (EXPERT)")
53         group = parser.add_argument_group('Arguments for updates')
54         group.add_argument('--check-for-updates', action='store_true',
55                            help='Check if new updates are available and exit')
56         group.add_argument('--once', action='store_true',
57                            help="Download and apply updates only once. When "
58                                 "not set, updates are continuously applied")
59         group.add_argument('--catch-up', action='store_true',
60                            help="Download and apply updates until no new "
61                                 "data is available on the server")
62         group.add_argument('--no-index', action='store_false', dest='do_index',
63                            help=("Do not index the new data. Only usable "
64                                  "together with --once"))
65         group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
66                            help='Size of cache to be used by osm2pgsql (in MB)')
67         group = parser.add_argument_group('Download parameters')
68         group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
69                            help='Set timeout for file downloads')
70
71     @staticmethod
72     def _init_replication(args):
73         from ..tools import replication, refresh
74
75         LOG.warning("Initialising replication updates")
76         with connect(args.config.get_libpq_dsn()) as conn:
77             replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
78             if args.update_functions:
79                 LOG.warning("Create functions")
80                 refresh.create_functions(conn, args.config, True, False)
81         return 0
82
83
84     @staticmethod
85     def _check_for_updates(args):
86         from ..tools import replication
87
88         with connect(args.config.get_libpq_dsn()) as conn:
89             return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
90
91     @staticmethod
92     def _report_update(batchdate, start_import, start_index):
93         def round_time(delta):
94             return dt.timedelta(seconds=int(delta.total_seconds()))
95
96         end = dt.datetime.now(dt.timezone.utc)
97         LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
98                     round_time((start_index or end) - start_import),
99                     "Indexing: {} ".format(round_time(end - start_index))
100                     if start_index else '',
101                     round_time(end - start_import),
102                     round_time(end - batchdate))
103
104
105     @staticmethod
106     def _compute_update_interval(args):
107         if args.catch_up:
108             return 0
109
110         update_interval = args.config.get_int('REPLICATION_UPDATE_INTERVAL')
111         # Sanity check to not overwhelm the Geofabrik servers.
112         if 'download.geofabrik.de' in args.config.REPLICATION_URL\
113            and update_interval < 86400:
114             LOG.fatal("Update interval too low for download.geofabrik.de.\n"
115                       "Please check install documentation "
116                       "(https://nominatim.org/release-docs/latest/admin/Import-and-Update#"
117                       "setting-up-the-update-process).")
118             raise UsageError("Invalid replication update interval setting.")
119
120         return update_interval
121
122
123     @staticmethod
124     def _update(args):
125         from ..tools import replication
126         from ..indexer.indexer import Indexer
127         from ..tokenizer import factory as tokenizer_factory
128
129         update_interval = UpdateReplication._compute_update_interval(args)
130
131         params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
132         params.update(base_url=args.config.REPLICATION_URL,
133                       update_interval=update_interval,
134                       import_file=args.project_dir / 'osmosischange.osc',
135                       max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
136                       indexed_only=not args.once)
137
138         if not args.once:
139             if not args.do_index:
140                 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
141                 raise UsageError("Bad argument '--no-index'.")
142             recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
143
144         tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
145         indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
146
147         while True:
148             with connect(args.config.get_libpq_dsn()) as conn:
149                 start = dt.datetime.now(dt.timezone.utc)
150                 state = replication.update(conn, params)
151                 if state is not replication.UpdateState.NO_CHANGES:
152                     status.log_status(conn, start, 'import')
153                 batchdate, _, _ = status.get_status(conn)
154                 conn.commit()
155
156             if state is not replication.UpdateState.NO_CHANGES and args.do_index:
157                 index_start = dt.datetime.now(dt.timezone.utc)
158                 indexer.index_full(analyse=False)
159
160                 with connect(args.config.get_libpq_dsn()) as conn:
161                     status.set_indexed(conn, True)
162                     status.log_status(conn, index_start, 'index')
163                     conn.commit()
164             else:
165                 index_start = None
166
167             if state is replication.UpdateState.NO_CHANGES and \
168                args.catch_up or update_interval > 40*60:
169                 while indexer.has_pending():
170                     indexer.index_full(analyse=False)
171
172             if LOG.isEnabledFor(logging.WARNING):
173                 UpdateReplication._report_update(batchdate, start, index_start)
174
175             if args.once or (args.catch_up and state is replication.UpdateState.NO_CHANGES):
176                 break
177
178             if state is replication.UpdateState.NO_CHANGES:
179                 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
180                 time.sleep(recheck_interval)
181
182
183     @staticmethod
184     def run(args):
185         socket.setdefaulttimeout(args.socket_timeout)
186
187         if args.init:
188             return UpdateReplication._init_replication(args)
189
190         if args.check_for_updates:
191             return UpdateReplication._check_for_updates(args)
192
193         UpdateReplication._update(args)
194         return 0