]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/refresh.py
actions: add test for database migration
[nominatim.git] / src / nominatim_db / tools / refresh.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for bringing auxiliary data in the database up-to-date.
9 """
10 from typing import MutableSequence, Tuple, Any, Type, Mapping, Sequence, List, cast
11 import csv
12 import gzip
13 import logging
14 from pathlib import Path
15
16 from psycopg import sql as pysql
17
18 from ..config import Configuration
19 from ..db.connection import Connection, connect, postgis_version_tuple,\
20                             drop_tables
21 from ..db.utils import execute_file
22 from ..db.sql_preprocessor import SQLPreprocessor
23
24 LOG = logging.getLogger()
25
26 OSM_TYPE = {'N': 'node', 'W': 'way', 'R': 'relation'}
27
28 def _add_address_level_rows_from_entry(rows: MutableSequence[Tuple[Any, ...]],
29                                        entry: Mapping[str, Any]) -> None:
30     """ Converts a single entry from the JSON format for address rank
31         descriptions into a flat format suitable for inserting into a
32         PostgreSQL table and adds these lines to `rows`.
33     """
34     countries = entry.get('countries') or (None, )
35     for key, values in entry['tags'].items():
36         for value, ranks in values.items():
37             if isinstance(ranks, list):
38                 rank_search, rank_address = ranks
39             else:
40                 rank_search = rank_address = ranks
41             if not value:
42                 value = None
43             for country in countries:
44                 rows.append((country, key, value, rank_search, rank_address))
45
46
47 def load_address_levels(conn: Connection, table: str, levels: Sequence[Mapping[str, Any]]) -> None:
48     """ Replace the `address_levels` table with the contents of `levels'.
49
50         A new table is created any previously existing table is dropped.
51         The table has the following columns:
52             country, class, type, rank_search, rank_address
53     """
54     rows: List[Tuple[Any, ...]]  = []
55     for entry in levels:
56         _add_address_level_rows_from_entry(rows, entry)
57
58     drop_tables(conn, table)
59
60     with conn.cursor() as cur:
61         cur.execute(pysql.SQL("""CREATE TABLE {} (
62                                         country_code varchar(2),
63                                         class TEXT,
64                                         type TEXT,
65                                         rank_search SMALLINT,
66                                         rank_address SMALLINT)
67                               """).format(pysql.Identifier(table)))
68
69         cur.executemany(pysql.SQL("INSERT INTO {} VALUES (%s, %s, %s, %s, %s)")
70                              .format(pysql.Identifier(table)), rows)
71
72         cur.execute(pysql.SQL('CREATE UNIQUE INDEX ON {} (country_code, class, type)')
73                     .format(pysql.Identifier(table)))
74
75     conn.commit()
76
77
78 def load_address_levels_from_config(conn: Connection, config: Configuration) -> None:
79     """ Replace the `address_levels` table with the content as
80         defined in the given configuration. Uses the parameter
81         NOMINATIM_ADDRESS_LEVEL_CONFIG to determine the location of the
82         configuration file.
83     """
84     cfg = config.load_sub_configuration('', config='ADDRESS_LEVEL_CONFIG')
85     load_address_levels(conn, 'address_levels', cfg)
86
87
88 def create_functions(conn: Connection, config: Configuration,
89                      enable_diff_updates: bool = True,
90                      enable_debug: bool = False) -> None:
91     """ (Re)create the PL/pgSQL functions.
92     """
93     sql = SQLPreprocessor(conn, config)
94
95     sql.run_sql_file(conn, 'functions.sql',
96                      disable_diff_updates=not enable_diff_updates,
97                      debug=enable_debug)
98
99
100 def import_wikipedia_articles(dsn: str, data_path: Path, ignore_errors: bool = False) -> int:
101     """ Replaces the wikipedia importance tables with new data.
102         The import is run in a single transaction so that the new data
103         is replace seamlessly.
104
105         Returns 0 if all was well and 1 if the importance file could not
106         be found. Throws an exception if there was an error reading the file.
107     """
108     if import_importance_csv(dsn, data_path / 'wikimedia-importance.csv.gz') == 0 \
109        or import_importance_sql(dsn, data_path / 'wikimedia-importance.sql.gz',
110                                 ignore_errors) == 0:
111         return 0
112
113     return 1
114
115
116 def import_importance_csv(dsn: str, data_file: Path) -> int:
117     """ Replace wikipedia importance table with data from a
118         single CSV file.
119
120         The file must be a gzipped CSV and have the following columns:
121         language, title, importance, wikidata_id
122
123         Other columns may be present but will be ignored.
124     """
125     if not data_file.exists():
126         return 1
127
128     # Only import the first occurrence of a wikidata ID.
129     # This keeps indexes and table small.
130     wd_done = set()
131
132     with connect(dsn) as conn:
133         drop_tables(conn, 'wikipedia_article', 'wikipedia_redirect', 'wikimedia_importance')
134         with conn.cursor() as cur:
135             cur.execute("""CREATE TABLE wikimedia_importance (
136                              language TEXT NOT NULL,
137                              title TEXT NOT NULL,
138                              importance double precision NOT NULL,
139                              wikidata TEXT
140                            ) """)
141
142             copy_cmd = """COPY wikimedia_importance(language, title, importance, wikidata)
143                           FROM STDIN"""
144             with gzip.open(str(data_file), 'rt') as fd, cur.copy(copy_cmd) as copy:
145                 for row in csv.DictReader(fd, delimiter='\t', quotechar='|'):
146                     wd_id = int(row['wikidata_id'][1:])
147                     copy.write_row((row['language'],
148                                     row['title'],
149                                     row['importance'],
150                                     None if wd_id in wd_done else row['wikidata_id']))
151                     wd_done.add(wd_id)
152
153             cur.execute("""CREATE INDEX IF NOT EXISTS idx_wikimedia_importance_title
154                            ON wikimedia_importance (title)""")
155             cur.execute("""CREATE INDEX IF NOT EXISTS idx_wikimedia_importance_wikidata
156                            ON wikimedia_importance (wikidata)
157                            WHERE wikidata is not null""")
158
159         conn.commit()
160
161     return 0
162
163
164 def import_importance_sql(dsn: str, data_file: Path, ignore_errors: bool) -> int:
165     """ Replace wikipedia importance table with data from an SQL file.
166     """
167     if not data_file.exists():
168         return 1
169
170     pre_code = """BEGIN;
171                   DROP TABLE IF EXISTS "wikipedia_article";
172                   DROP TABLE IF EXISTS "wikipedia_redirect";
173                   DROP TABLE IF EXISTS "wikipedia_importance";
174                """
175     post_code = "COMMIT"
176     execute_file(dsn, data_file, ignore_errors=ignore_errors,
177                  pre_code=pre_code, post_code=post_code)
178
179     return 0
180
181
182 def import_secondary_importance(dsn: str, data_path: Path, ignore_errors: bool = False) -> int:
183     """ Replaces the secondary importance raster data table with new data.
184
185         Returns 0 if all was well and 1 if the raster SQL file could not
186         be found. Throws an exception if there was an error reading the file.
187     """
188     datafile = data_path / 'secondary_importance.sql.gz'
189     if not datafile.exists():
190         return 1
191
192     with connect(dsn) as conn:
193         postgis_version = postgis_version_tuple(conn)
194         if postgis_version[0] < 3:
195             LOG.error('PostGIS version is too old for using OSM raster data.')
196             return 2
197
198     execute_file(dsn, datafile, ignore_errors=ignore_errors)
199
200     return 0
201
202 def recompute_importance(conn: Connection) -> None:
203     """ Recompute wikipedia links and importance for all entries in placex.
204         This is a long-running operations that must not be executed in
205         parallel with updates.
206     """
207     with conn.cursor() as cur:
208         cur.execute('ALTER TABLE placex DISABLE TRIGGER ALL')
209         cur.execute("""
210             UPDATE placex SET (wikipedia, importance) =
211                (SELECT wikipedia, importance
212                 FROM compute_importance(extratags, country_code, rank_search, centroid))
213             """)
214         cur.execute("""
215             UPDATE placex s SET wikipedia = d.wikipedia, importance = d.importance
216              FROM placex d
217              WHERE s.place_id = d.linked_place_id and d.wikipedia is not null
218                    and (s.wikipedia is null or s.importance < d.importance);
219             """)
220
221         cur.execute('ALTER TABLE placex ENABLE TRIGGER ALL')
222     conn.commit()
223
224
225 def _quote_php_variable(var_type: Type[Any], config: Configuration,
226                         conf_name: str) -> str:
227     if var_type == bool:
228         return 'true' if config.get_bool(conf_name) else 'false'
229
230     if var_type == int:
231         return cast(str, getattr(config, conf_name))
232
233     if not getattr(config, conf_name):
234         return 'false'
235
236     if var_type == Path:
237         value = str(config.get_path(conf_name) or '')
238     else:
239         value = getattr(config, conf_name)
240
241     quoted = value.replace("'", "\\'")
242     return f"'{quoted}'"
243
244
245 def invalidate_osm_object(osm_type: str, osm_id: int, conn: Connection,
246                           recursive: bool = True) -> None:
247     """ Mark the given OSM object for reindexing. When 'recursive' is set
248         to True (the default), then all dependent objects are marked for
249         reindexing as well.
250
251         'osm_type' must be on of 'N' (node), 'W' (way) or 'R' (relation).
252         If the given object does not exist, then nothing happens.
253     """
254     assert osm_type in ('N', 'R', 'W')
255
256     LOG.warning("Invalidating OSM %s %s%s.",
257                 OSM_TYPE[osm_type], osm_id,
258                 ' and its dependent places' if recursive else '')
259
260     with conn.cursor() as cur:
261         if recursive:
262             sql = """SELECT place_force_update(place_id)
263                      FROM placex WHERE osm_type = %s and osm_id = %s"""
264         else:
265             sql = """UPDATE placex SET indexed_status = 2
266                      WHERE osm_type = %s and osm_id = %s"""
267
268         cur.execute(sql, (osm_type, osm_id))