]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/refresh.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / src / nominatim_db / tools / refresh.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for bringing auxiliary data in the database up-to-date.
9 """
10 from typing import MutableSequence, Tuple, Any, Type, Mapping, Sequence, List, cast
11 import csv
12 import gzip
13 import logging
14 from pathlib import Path
15
16 from psycopg import sql as pysql
17
18 from ..config import Configuration
19 from ..db.connection import Connection, connect, drop_tables
20 from ..db.utils import execute_file
21 from ..db.sql_preprocessor import SQLPreprocessor
22
23 LOG = logging.getLogger()
24
25 OSM_TYPE = {'N': 'node', 'W': 'way', 'R': 'relation'}
26
27
28 def _add_address_level_rows_from_entry(rows: MutableSequence[Tuple[Any, ...]],
29                                        entry: Mapping[str, Any]) -> None:
30     """ Converts a single entry from the JSON format for address rank
31         descriptions into a flat format suitable for inserting into a
32         PostgreSQL table and adds these lines to `rows`.
33     """
34     countries = entry.get('countries') or (None, )
35     for key, values in entry['tags'].items():
36         for value, ranks in values.items():
37             if isinstance(ranks, list):
38                 rank_search, rank_address = ranks
39             else:
40                 rank_search = rank_address = ranks
41             if not value:
42                 value = None
43             for country in countries:
44                 rows.append((country, key, value, rank_search, rank_address))
45
46
47 def load_address_levels(conn: Connection, table: str, levels: Sequence[Mapping[str, Any]]) -> None:
48     """ Replace the `address_levels` table with the contents of `levels'.
49
50         A new table is created any previously existing table is dropped.
51         The table has the following columns:
52             country, class, type, rank_search, rank_address
53     """
54     rows: List[Tuple[Any, ...]] = []
55     for entry in levels:
56         _add_address_level_rows_from_entry(rows, entry)
57
58     drop_tables(conn, table)
59
60     with conn.cursor() as cur:
61         cur.execute(pysql.SQL("""CREATE TABLE {} (
62                                         country_code varchar(2),
63                                         class TEXT,
64                                         type TEXT,
65                                         rank_search SMALLINT,
66                                         rank_address SMALLINT)
67                               """).format(pysql.Identifier(table)))
68
69         cur.executemany(pysql.SQL("INSERT INTO {} VALUES (%s, %s, %s, %s, %s)")
70                              .format(pysql.Identifier(table)), rows)
71
72         cur.execute(pysql.SQL('CREATE UNIQUE INDEX ON {} (country_code, class, type)')
73                     .format(pysql.Identifier(table)))
74
75     conn.commit()
76
77
78 def load_address_levels_from_config(conn: Connection, config: Configuration) -> None:
79     """ Replace the `address_levels` table with the content as
80         defined in the given configuration. Uses the parameter
81         NOMINATIM_ADDRESS_LEVEL_CONFIG to determine the location of the
82         configuration file.
83     """
84     cfg = config.load_sub_configuration('', config='ADDRESS_LEVEL_CONFIG')
85     load_address_levels(conn, 'address_levels', cfg)
86
87
88 def create_functions(conn: Connection, config: Configuration,
89                      enable_diff_updates: bool = True,
90                      enable_debug: bool = False) -> None:
91     """ (Re)create the PL/pgSQL functions.
92     """
93     sql = SQLPreprocessor(conn, config)
94
95     sql.run_sql_file(conn, 'functions.sql',
96                      disable_diff_updates=not enable_diff_updates,
97                      debug=enable_debug)
98
99
100 def import_wikipedia_articles(dsn: str, data_path: Path, ignore_errors: bool = False) -> int:
101     """ Replaces the wikipedia importance tables with new data.
102         The import is run in a single transaction so that the new data
103         is replace seamlessly.
104
105         Returns 0 if all was well and 1 if the importance file could not
106         be found. Throws an exception if there was an error reading the file.
107     """
108     if import_importance_csv(dsn, data_path / 'wikimedia-importance.csv.gz') == 0 \
109        or import_importance_sql(dsn, data_path / 'wikimedia-importance.sql.gz',
110                                 ignore_errors) == 0:
111         return 0
112
113     return 1
114
115
116 def import_importance_csv(dsn: str, data_file: Path) -> int:
117     """ Replace wikipedia importance table with data from a
118         single CSV file.
119
120         The file must be a gzipped CSV and have the following columns:
121         language, title, importance, wikidata_id
122
123         Other columns may be present but will be ignored.
124     """
125     if not data_file.exists():
126         return 1
127
128     # Only import the first occurrence of a wikidata ID.
129     # This keeps indexes and table small.
130     wd_done = set()
131
132     with connect(dsn) as conn:
133         drop_tables(conn, 'wikipedia_article', 'wikipedia_redirect', 'wikimedia_importance')
134         with conn.cursor() as cur:
135             cur.execute("""CREATE TABLE wikimedia_importance (
136                              language TEXT NOT NULL,
137                              title TEXT NOT NULL,
138                              importance double precision NOT NULL,
139                              wikidata TEXT
140                            ) """)
141
142             copy_cmd = """COPY wikimedia_importance(language, title, importance, wikidata)
143                           FROM STDIN"""
144             with gzip.open(str(data_file), 'rt') as fd, cur.copy(copy_cmd) as copy:
145                 for row in csv.DictReader(fd, delimiter='\t', quotechar='|'):
146                     wd_id = int(row['wikidata_id'][1:])
147                     copy.write_row((row['language'],
148                                     row['title'],
149                                     row['importance'],
150                                     None if wd_id in wd_done else row['wikidata_id']))
151                     wd_done.add(wd_id)
152
153             cur.execute("""CREATE INDEX IF NOT EXISTS idx_wikimedia_importance_title
154                            ON wikimedia_importance (title)""")
155             cur.execute("""CREATE INDEX IF NOT EXISTS idx_wikimedia_importance_wikidata
156                            ON wikimedia_importance (wikidata)
157                            WHERE wikidata is not null""")
158
159         conn.commit()
160
161     return 0
162
163
164 def import_importance_sql(dsn: str, data_file: Path, ignore_errors: bool) -> int:
165     """ Replace wikipedia importance table with data from an SQL file.
166     """
167     if not data_file.exists():
168         return 1
169
170     pre_code = """BEGIN;
171                   DROP TABLE IF EXISTS "wikipedia_article";
172                   DROP TABLE IF EXISTS "wikipedia_redirect";
173                   DROP TABLE IF EXISTS "wikipedia_importance";
174                """
175     post_code = "COMMIT"
176     execute_file(dsn, data_file, ignore_errors=ignore_errors,
177                  pre_code=pre_code, post_code=post_code)
178
179     return 0
180
181
182 def import_secondary_importance(dsn: str, data_path: Path, ignore_errors: bool = False) -> int:
183     """ Replaces the secondary importance raster data table with new data.
184
185         Returns 0 if all was well and 1 if the raster SQL file could not
186         be found. Throws an exception if there was an error reading the file.
187     """
188     datafile = data_path / 'secondary_importance.sql.gz'
189     if not datafile.exists():
190         return 1
191
192     execute_file(dsn, datafile, ignore_errors=ignore_errors)
193
194     return 0
195
196
197 def recompute_importance(conn: Connection) -> None:
198     """ Recompute wikipedia links and importance for all entries in placex.
199         This is a long-running operations that must not be executed in
200         parallel with updates.
201     """
202     with conn.cursor() as cur:
203         cur.execute('ALTER TABLE placex DISABLE TRIGGER ALL')
204         cur.execute("""
205             UPDATE placex SET (wikipedia, importance) =
206                (SELECT wikipedia, importance
207                 FROM compute_importance(extratags, country_code, rank_search, centroid))
208             """)
209         cur.execute("""
210             UPDATE placex s SET wikipedia = d.wikipedia, importance = d.importance
211              FROM placex d
212              WHERE s.place_id = d.linked_place_id and d.wikipedia is not null
213                    and (s.wikipedia is null or s.importance < d.importance);
214             """)
215
216         cur.execute('ALTER TABLE placex ENABLE TRIGGER ALL')
217     conn.commit()
218
219
220 def _quote_php_variable(var_type: Type[Any], config: Configuration,
221                         conf_name: str) -> str:
222     if var_type == bool:
223         return 'true' if config.get_bool(conf_name) else 'false'
224
225     if var_type == int:
226         return cast(str, getattr(config, conf_name))
227
228     if not getattr(config, conf_name):
229         return 'false'
230
231     if var_type == Path:
232         value = str(config.get_path(conf_name) or '')
233     else:
234         value = getattr(config, conf_name)
235
236     quoted = value.replace("'", "\\'")
237     return f"'{quoted}'"
238
239
240 def invalidate_osm_object(osm_type: str, osm_id: int, conn: Connection,
241                           recursive: bool = True) -> None:
242     """ Mark the given OSM object for reindexing. When 'recursive' is set
243         to True (the default), then all dependent objects are marked for
244         reindexing as well.
245
246         'osm_type' must be on of 'N' (node), 'W' (way) or 'R' (relation).
247         If the given object does not exist, then nothing happens.
248     """
249     assert osm_type in ('N', 'R', 'W')
250
251     LOG.warning("Invalidating OSM %s %s%s.",
252                 OSM_TYPE[osm_type], osm_id,
253                 ' and its dependent places' if recursive else '')
254
255     with conn.cursor() as cur:
256         if recursive:
257             sql = """SELECT place_force_update(place_id)
258                      FROM placex WHERE osm_type = %s and osm_id = %s"""
259         else:
260             sql = """UPDATE placex SET indexed_status = 2
261                      WHERE osm_type = %s and osm_id = %s"""
262
263         cur.execute(sql, (osm_type, osm_id))