]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/refresh.py
dc98fe4140b1b552abce67609dda467690a9772d
[nominatim.git] / src / nominatim_db / tools / refresh.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for bringing auxiliary data in the database up-to-date.
9 """
10 from typing import MutableSequence, Tuple, Any, Type, Mapping, Sequence, List, cast
11 import csv
12 import gzip
13 import logging
14 from pathlib import Path
15
16 from psycopg import sql as pysql
17
18 from ..config import Configuration
19 from ..db.connection import Connection, connect, postgis_version_tuple, \
20                             drop_tables
21 from ..db.utils import execute_file
22 from ..db.sql_preprocessor import SQLPreprocessor
23
24 LOG = logging.getLogger()
25
26 OSM_TYPE = {'N': 'node', 'W': 'way', 'R': 'relation'}
27
28
29 def _add_address_level_rows_from_entry(rows: MutableSequence[Tuple[Any, ...]],
30                                        entry: Mapping[str, Any]) -> None:
31     """ Converts a single entry from the JSON format for address rank
32         descriptions into a flat format suitable for inserting into a
33         PostgreSQL table and adds these lines to `rows`.
34     """
35     countries = entry.get('countries') or (None, )
36     for key, values in entry['tags'].items():
37         for value, ranks in values.items():
38             if isinstance(ranks, list):
39                 rank_search, rank_address = ranks
40             else:
41                 rank_search = rank_address = ranks
42             if not value:
43                 value = None
44             for country in countries:
45                 rows.append((country, key, value, rank_search, rank_address))
46
47
48 def load_address_levels(conn: Connection, table: str, levels: Sequence[Mapping[str, Any]]) -> None:
49     """ Replace the `address_levels` table with the contents of `levels'.
50
51         A new table is created any previously existing table is dropped.
52         The table has the following columns:
53             country, class, type, rank_search, rank_address
54     """
55     rows: List[Tuple[Any, ...]] = []
56     for entry in levels:
57         _add_address_level_rows_from_entry(rows, entry)
58
59     drop_tables(conn, table)
60
61     with conn.cursor() as cur:
62         cur.execute(pysql.SQL("""CREATE TABLE {} (
63                                         country_code varchar(2),
64                                         class TEXT,
65                                         type TEXT,
66                                         rank_search SMALLINT,
67                                         rank_address SMALLINT)
68                               """).format(pysql.Identifier(table)))
69
70         cur.executemany(pysql.SQL("INSERT INTO {} VALUES (%s, %s, %s, %s, %s)")
71                              .format(pysql.Identifier(table)), rows)
72
73         cur.execute(pysql.SQL('CREATE UNIQUE INDEX ON {} (country_code, class, type)')
74                     .format(pysql.Identifier(table)))
75
76     conn.commit()
77
78
79 def load_address_levels_from_config(conn: Connection, config: Configuration) -> None:
80     """ Replace the `address_levels` table with the content as
81         defined in the given configuration. Uses the parameter
82         NOMINATIM_ADDRESS_LEVEL_CONFIG to determine the location of the
83         configuration file.
84     """
85     cfg = config.load_sub_configuration('', config='ADDRESS_LEVEL_CONFIG')
86     load_address_levels(conn, 'address_levels', cfg)
87
88
89 def create_functions(conn: Connection, config: Configuration,
90                      enable_diff_updates: bool = True,
91                      enable_debug: bool = False) -> None:
92     """ (Re)create the PL/pgSQL functions.
93     """
94     sql = SQLPreprocessor(conn, config)
95
96     sql.run_sql_file(conn, 'functions.sql',
97                      disable_diff_updates=not enable_diff_updates,
98                      debug=enable_debug)
99
100
101 def import_wikipedia_articles(dsn: str, data_path: Path, ignore_errors: bool = False) -> int:
102     """ Replaces the wikipedia importance tables with new data.
103         The import is run in a single transaction so that the new data
104         is replace seamlessly.
105
106         Returns 0 if all was well and 1 if the importance file could not
107         be found. Throws an exception if there was an error reading the file.
108     """
109     if import_importance_csv(dsn, data_path / 'wikimedia-importance.csv.gz') == 0 \
110        or import_importance_sql(dsn, data_path / 'wikimedia-importance.sql.gz',
111                                 ignore_errors) == 0:
112         return 0
113
114     return 1
115
116
117 def import_importance_csv(dsn: str, data_file: Path) -> int:
118     """ Replace wikipedia importance table with data from a
119         single CSV file.
120
121         The file must be a gzipped CSV and have the following columns:
122         language, title, importance, wikidata_id
123
124         Other columns may be present but will be ignored.
125     """
126     if not data_file.exists():
127         return 1
128
129     # Only import the first occurrence of a wikidata ID.
130     # This keeps indexes and table small.
131     wd_done = set()
132
133     with connect(dsn) as conn:
134         drop_tables(conn, 'wikipedia_article', 'wikipedia_redirect', 'wikimedia_importance')
135         with conn.cursor() as cur:
136             cur.execute("""CREATE TABLE wikimedia_importance (
137                              language TEXT NOT NULL,
138                              title TEXT NOT NULL,
139                              importance double precision NOT NULL,
140                              wikidata TEXT
141                            ) """)
142
143             copy_cmd = """COPY wikimedia_importance(language, title, importance, wikidata)
144                           FROM STDIN"""
145             with gzip.open(str(data_file), 'rt') as fd, cur.copy(copy_cmd) as copy:
146                 for row in csv.DictReader(fd, delimiter='\t', quotechar='|'):
147                     wd_id = int(row['wikidata_id'][1:])
148                     copy.write_row((row['language'],
149                                     row['title'],
150                                     row['importance'],
151                                     None if wd_id in wd_done else row['wikidata_id']))
152                     wd_done.add(wd_id)
153
154             cur.execute("""CREATE INDEX IF NOT EXISTS idx_wikimedia_importance_title
155                            ON wikimedia_importance (title)""")
156             cur.execute("""CREATE INDEX IF NOT EXISTS idx_wikimedia_importance_wikidata
157                            ON wikimedia_importance (wikidata)
158                            WHERE wikidata is not null""")
159
160         conn.commit()
161
162     return 0
163
164
165 def import_importance_sql(dsn: str, data_file: Path, ignore_errors: bool) -> int:
166     """ Replace wikipedia importance table with data from an SQL file.
167     """
168     if not data_file.exists():
169         return 1
170
171     pre_code = """BEGIN;
172                   DROP TABLE IF EXISTS "wikipedia_article";
173                   DROP TABLE IF EXISTS "wikipedia_redirect";
174                   DROP TABLE IF EXISTS "wikipedia_importance";
175                """
176     post_code = "COMMIT"
177     execute_file(dsn, data_file, ignore_errors=ignore_errors,
178                  pre_code=pre_code, post_code=post_code)
179
180     return 0
181
182
183 def import_secondary_importance(dsn: str, data_path: Path, ignore_errors: bool = False) -> int:
184     """ Replaces the secondary importance raster data table with new data.
185
186         Returns 0 if all was well and 1 if the raster SQL file could not
187         be found. Throws an exception if there was an error reading the file.
188     """
189     datafile = data_path / 'secondary_importance.sql.gz'
190     if not datafile.exists():
191         return 1
192
193     with connect(dsn) as conn:
194         postgis_version = postgis_version_tuple(conn)
195         if postgis_version[0] < 3:
196             LOG.error('PostGIS version is too old for using OSM raster data.')
197             return 2
198
199     execute_file(dsn, datafile, ignore_errors=ignore_errors)
200
201     return 0
202
203
204 def recompute_importance(conn: Connection) -> None:
205     """ Recompute wikipedia links and importance for all entries in placex.
206         This is a long-running operations that must not be executed in
207         parallel with updates.
208     """
209     with conn.cursor() as cur:
210         cur.execute('ALTER TABLE placex DISABLE TRIGGER ALL')
211         cur.execute("""
212             UPDATE placex SET (wikipedia, importance) =
213                (SELECT wikipedia, importance
214                 FROM compute_importance(extratags, country_code, rank_search, centroid))
215             """)
216         cur.execute("""
217             UPDATE placex s SET wikipedia = d.wikipedia, importance = d.importance
218              FROM placex d
219              WHERE s.place_id = d.linked_place_id and d.wikipedia is not null
220                    and (s.wikipedia is null or s.importance < d.importance);
221             """)
222
223         cur.execute('ALTER TABLE placex ENABLE TRIGGER ALL')
224     conn.commit()
225
226
227 def _quote_php_variable(var_type: Type[Any], config: Configuration,
228                         conf_name: str) -> str:
229     if var_type == bool:
230         return 'true' if config.get_bool(conf_name) else 'false'
231
232     if var_type == int:
233         return cast(str, getattr(config, conf_name))
234
235     if not getattr(config, conf_name):
236         return 'false'
237
238     if var_type == Path:
239         value = str(config.get_path(conf_name) or '')
240     else:
241         value = getattr(config, conf_name)
242
243     quoted = value.replace("'", "\\'")
244     return f"'{quoted}'"
245
246
247 def invalidate_osm_object(osm_type: str, osm_id: int, conn: Connection,
248                           recursive: bool = True) -> None:
249     """ Mark the given OSM object for reindexing. When 'recursive' is set
250         to True (the default), then all dependent objects are marked for
251         reindexing as well.
252
253         'osm_type' must be on of 'N' (node), 'W' (way) or 'R' (relation).
254         If the given object does not exist, then nothing happens.
255     """
256     assert osm_type in ('N', 'R', 'W')
257
258     LOG.warning("Invalidating OSM %s %s%s.",
259                 OSM_TYPE[osm_type], osm_id,
260                 ' and its dependent places' if recursive else '')
261
262     with conn.cursor() as cur:
263         if recursive:
264             sql = """SELECT place_force_update(place_id)
265                      FROM placex WHERE osm_type = %s and osm_id = %s"""
266         else:
267             sql = """UPDATE placex SET indexed_status = 2
268                      WHERE osm_type = %s and osm_id = %s"""
269
270         cur.execute(sql, (osm_type, osm_id))