2 Implementation of the 'replication' sub-command.
9 from nominatim.db import status
10 from nominatim.db.connection import connect
11 from nominatim.errors import UsageError
13 LOG = logging.getLogger()
15 # Do not repeat documentation of subcommand classes.
16 # pylint: disable=C0111
17 # Using non-top-level imports to make pyosmium optional for replication only.
18 # pylint: disable=E0012,C0415
20 class UpdateReplication:
22 Update the database using an online replication service.
24 An OSM replication service is an online service that provides regular
25 updates (OSM diff files) for the planet or update they provide. The OSMF
26 provides the primary replication service for the full planet at
27 https://planet.osm.org/replication/ but there are other providers of
28 extracts of OSM data who provide such a service as well.
30 This sub-command allows to set up such a replication service and download
31 and import updates at regular intervals. You need to call '--init' once to
32 set up the process or whenever you change the replication configuration
33 parameters. Without any arguments, the sub-command will go into a loop and
34 continuously apply updates as they become available. Giving `--once` just
35 downloads and imports the next batch of updates.
40 group = parser.add_argument_group('Arguments for initialisation')
41 group.add_argument('--init', action='store_true',
42 help='Initialise the update process')
43 group.add_argument('--no-update-functions', dest='update_functions',
45 help=("Do not update the trigger function to "
46 "support differential updates."))
47 group = parser.add_argument_group('Arguments for updates')
48 group.add_argument('--check-for-updates', action='store_true',
49 help='Check if new updates are available and exit')
50 group.add_argument('--once', action='store_true',
51 help=("Download and apply updates only once. When "
52 "not set, updates are continuously applied"))
53 group.add_argument('--no-index', action='store_false', dest='do_index',
54 help=("Do not index the new data. Only usable "
55 "together with --once"))
56 group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
57 help='Size of cache to be used by osm2pgsql (in MB)')
58 group = parser.add_argument_group('Download parameters')
59 group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
60 help='Set timeout for file downloads')
63 def _init_replication(args):
64 from ..tools import replication, refresh
66 LOG.warning("Initialising replication updates")
67 with connect(args.config.get_libpq_dsn()) as conn:
68 replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
69 if args.update_functions:
70 LOG.warning("Create functions")
71 refresh.create_functions(conn, args.config, True, False)
76 def _check_for_updates(args):
77 from ..tools import replication
79 with connect(args.config.get_libpq_dsn()) as conn:
80 return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
83 def _report_update(batchdate, start_import, start_index):
84 def round_time(delta):
85 return dt.timedelta(seconds=int(delta.total_seconds()))
87 end = dt.datetime.now(dt.timezone.utc)
88 LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
89 round_time((start_index or end) - start_import),
90 "Indexing: {} ".format(round_time(end - start_index))
91 if start_index else '',
92 round_time(end - start_import),
93 round_time(end - batchdate))
97 from ..tools import replication
98 from ..indexer.indexer import Indexer
99 from ..tokenizer import factory as tokenizer_factory
101 params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
102 params.update(base_url=args.config.REPLICATION_URL,
103 update_interval=args.config.get_int('REPLICATION_UPDATE_INTERVAL'),
104 import_file=args.project_dir / 'osmosischange.osc',
105 max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
106 indexed_only=not args.once)
108 # Sanity check to not overwhelm the Geofabrik servers.
109 if 'download.geofabrik.de' in params['base_url']\
110 and params['update_interval'] < 86400:
111 LOG.fatal("Update interval too low for download.geofabrik.de.\n"
112 "Please check install documentation "
113 "(https://nominatim.org/release-docs/latest/admin/Import-and-Update#"
114 "setting-up-the-update-process).")
115 raise UsageError("Invalid replication update interval setting.")
118 if not args.do_index:
119 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
120 raise UsageError("Bad argument '--no-index'.")
121 recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
123 tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
126 with connect(args.config.get_libpq_dsn()) as conn:
127 start = dt.datetime.now(dt.timezone.utc)
128 state = replication.update(conn, params)
129 if state is not replication.UpdateState.NO_CHANGES:
130 status.log_status(conn, start, 'import')
131 batchdate, _, _ = status.get_status(conn)
134 if state is not replication.UpdateState.NO_CHANGES and args.do_index:
135 index_start = dt.datetime.now(dt.timezone.utc)
136 indexer = Indexer(args.config.get_libpq_dsn(), tokenizer,
138 indexer.index_boundaries(0, 30)
139 indexer.index_by_rank(0, 30)
141 with connect(args.config.get_libpq_dsn()) as conn:
142 status.set_indexed(conn, True)
143 status.log_status(conn, index_start, 'index')
148 if LOG.isEnabledFor(logging.WARNING):
149 UpdateReplication._report_update(batchdate, start, index_start)
154 if state is replication.UpdateState.NO_CHANGES:
155 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
156 time.sleep(recheck_interval)
161 socket.setdefaulttimeout(args.socket_timeout)
164 return UpdateReplication._init_replication(args)
166 if args.check_for_updates:
167 return UpdateReplication._check_for_updates(args)
169 UpdateReplication._update(args)