1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for setting up and importing a new Nominatim database.
10 from typing import Tuple, Optional, Union, Sequence, MutableMapping, Any
15 from pathlib import Path
19 from psycopg import sql as pysql
21 from ..errors import UsageError
22 from ..config import Configuration
23 from ..db.connection import connect, get_pg_env, Connection, server_version_tuple,\
24 postgis_version_tuple, drop_tables, table_exists, execute_scalar
25 from ..db.sql_preprocessor import SQLPreprocessor
26 from ..db.query_pool import QueryPool
27 from .exec_utils import run_osm2pgsql
28 from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
30 LOG = logging.getLogger()
32 def _require_version(module: str, actual: Tuple[int, int], expected: Tuple[int, int]) -> None:
33 """ Compares the version for the given module and raises an exception
34 if the actual version is too old.
37 LOG.fatal('Minimum supported version of %s is %d.%d. '
38 'Found version %d.%d.',
39 module, expected[0], expected[1], actual[0], actual[1])
40 raise UsageError(f'{module} is too old.')
43 def _require_loaded(extension_name: str, conn: Connection) -> None:
44 """ Check that the given extension is loaded. """
45 with conn.cursor() as cur:
46 cur.execute('SELECT * FROM pg_extension WHERE extname = %s', (extension_name, ))
48 LOG.fatal('Required module %s is not loaded.', extension_name)
49 raise UsageError(f'{extension_name} is not loaded.')
52 def check_existing_database_plugins(dsn: str) -> None:
53 """ Check that the database has the required plugins installed."""
54 with connect(dsn) as conn:
55 _require_version('PostgreSQL server',
56 server_version_tuple(conn),
57 POSTGRESQL_REQUIRED_VERSION)
58 _require_version('PostGIS',
59 postgis_version_tuple(conn),
60 POSTGIS_REQUIRED_VERSION)
61 _require_loaded('hstore', conn)
64 def setup_database_skeleton(dsn: str, rouser: Optional[str] = None) -> None:
65 """ Create a new database for Nominatim and populate it with the
68 The function fails when the database already exists or Postgresql or
69 PostGIS versions are too old.
71 Uses `createdb` to create the database.
73 If 'rouser' is given, then the function also checks that the user
74 with that given name exists.
76 Requires superuser rights by the caller.
78 proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
80 if proc.returncode != 0:
81 raise UsageError('Creating new database failed.')
83 with connect(dsn) as conn:
84 _require_version('PostgreSQL server',
85 server_version_tuple(conn),
86 POSTGRESQL_REQUIRED_VERSION)
88 if rouser is not None:
89 cnt = execute_scalar(conn, 'SELECT count(*) FROM pg_user where usename = %s',
92 LOG.fatal("Web user '%s' does not exist. Create it with:\n"
93 "\n createuser %s", rouser, rouser)
94 raise UsageError('Missing read-only user.')
97 with conn.cursor() as cur:
98 cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
99 cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
101 postgis_version = postgis_version_tuple(conn)
102 if postgis_version[0] >= 3:
103 cur.execute('CREATE EXTENSION IF NOT EXISTS postgis_raster')
107 _require_version('PostGIS',
108 postgis_version_tuple(conn),
109 POSTGIS_REQUIRED_VERSION)
112 def import_osm_data(osm_files: Union[Path, Sequence[Path]],
113 options: MutableMapping[str, Any],
114 drop: bool = False, ignore_errors: bool = False) -> None:
115 """ Import the given OSM files. 'options' contains the list of
116 default settings for osm2pgsql.
118 options['import_file'] = osm_files
119 options['append'] = False
120 options['threads'] = 1
122 if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
123 # Make some educated guesses about cache size based on the size
124 # of the import file and the available memory.
125 mem = psutil.virtual_memory()
127 if isinstance(osm_files, list):
128 for fname in osm_files:
129 fsize += os.stat(str(fname)).st_size
131 fsize = os.stat(str(osm_files)).st_size
132 options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
133 fsize * 2) / 1024 / 1024) + 1
135 run_osm2pgsql(options)
137 with connect(options['dsn']) as conn:
138 if not ignore_errors:
139 with conn.cursor() as cur:
140 cur.execute('SELECT true FROM place LIMIT 1')
141 if cur.rowcount == 0:
142 raise UsageError('No data imported by osm2pgsql.')
145 drop_tables(conn, 'planet_osm_nodes')
148 if drop and options['flatnode_file']:
149 Path(options['flatnode_file']).unlink()
152 def create_tables(conn: Connection, config: Configuration, reverse_only: bool = False) -> None:
153 """ Create the set of basic tables.
154 When `reverse_only` is True, then the main table for searching will
155 be skipped and only reverse search is possible.
157 sql = SQLPreprocessor(conn, config)
158 sql.env.globals['db']['reverse_only'] = reverse_only
160 sql.run_sql_file(conn, 'tables.sql')
163 def create_table_triggers(conn: Connection, config: Configuration) -> None:
164 """ Create the triggers for the tables. The trigger functions must already
165 have been imported with refresh.create_functions().
167 sql = SQLPreprocessor(conn, config)
168 sql.run_sql_file(conn, 'table-triggers.sql')
171 def create_partition_tables(conn: Connection, config: Configuration) -> None:
172 """ Create tables that have explicit partitioning.
174 sql = SQLPreprocessor(conn, config)
175 sql.run_sql_file(conn, 'partition-tables.src.sql')
178 def truncate_data_tables(conn: Connection) -> None:
179 """ Truncate all data tables to prepare for a fresh load.
181 with conn.cursor() as cur:
182 cur.execute('TRUNCATE placex')
183 cur.execute('TRUNCATE place_addressline')
184 cur.execute('TRUNCATE location_area')
185 cur.execute('TRUNCATE location_area_country')
186 cur.execute('TRUNCATE location_property_tiger')
187 cur.execute('TRUNCATE location_property_osmline')
188 cur.execute('TRUNCATE location_postcode')
189 if table_exists(conn, 'search_name'):
190 cur.execute('TRUNCATE search_name')
191 cur.execute('DROP SEQUENCE IF EXISTS seq_place')
192 cur.execute('CREATE SEQUENCE seq_place start 100000')
194 cur.execute("""SELECT tablename FROM pg_tables
195 WHERE tablename LIKE 'location_road_%'""")
197 for table in [r[0] for r in list(cur)]:
198 cur.execute('TRUNCATE ' + table)
203 _COPY_COLUMNS = pysql.SQL(',').join(map(pysql.Identifier,
204 ('osm_type', 'osm_id', 'class', 'type',
205 'name', 'admin_level', 'address',
206 'extratags', 'geometry')))
209 async def load_data(dsn: str, threads: int) -> None:
210 """ Copy data into the word and placex table.
212 placex_threads = max(1, threads - 1)
214 progress = asyncio.create_task(_progress_print())
216 async with QueryPool(dsn, placex_threads + 1) as pool:
217 # Copy data from place to placex in <threads - 1> chunks.
218 for imod in range(placex_threads):
219 await pool.put_query(
220 pysql.SQL("""INSERT INTO placex ({columns})
221 SELECT {columns} FROM place
222 WHERE osm_id % {total} = {mod}
223 AND NOT (class='place'
224 and (type='houses' or type='postcode'))
225 AND ST_IsValid(geometry)
226 """).format(columns=_COPY_COLUMNS,
227 total=pysql.Literal(placex_threads),
228 mod=pysql.Literal(imod)), None)
230 # Interpolations need to be copied seperately
231 await pool.put_query("""
232 INSERT INTO location_property_osmline (osm_id, address, linegeo)
233 SELECT osm_id, address, geometry FROM place
234 WHERE class='place' and type='houses' and osm_type='W'
235 and ST_GeometryType(geometry) = 'ST_LineString' """, None)
239 async with await psycopg.AsyncConnection.connect(dsn) as aconn:
240 await aconn.execute('ANALYSE')
243 async def _progress_print() -> None:
246 await asyncio.sleep(1)
247 except asyncio.CancelledError:
248 print('', flush=True)
250 print('.', end='', flush=True)
253 async def create_search_indices(conn: Connection, config: Configuration,
254 drop: bool = False, threads: int = 1) -> None:
255 """ Create tables that have explicit partitioning.
258 # If index creation failed and left an index invalid, they need to be
259 # cleaned out first, so that the script recreates them.
260 with conn.cursor() as cur:
261 cur.execute("""SELECT relname FROM pg_class, pg_index
262 WHERE pg_index.indisvalid = false
263 AND pg_index.indexrelid = pg_class.oid""")
264 bad_indices = [row[0] for row in list(cur)]
265 for idx in bad_indices:
266 LOG.info("Drop invalid index %s.", idx)
267 cur.execute(pysql.SQL('DROP INDEX {}').format(pysql.Identifier(idx)))
270 sql = SQLPreprocessor(conn, config)
272 await sql.run_parallel_sql_file(config.get_libpq_dsn(),
273 'indices.sql', min(8, threads), drop=drop)