1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for setting up and importing a new Nominatim database.
14 from pathlib import Path
17 from psycopg2 import sql as pysql
19 from nominatim.db.connection import connect, get_pg_env
20 from nominatim.db.async_connection import DBConnection
21 from nominatim.db.sql_preprocessor import SQLPreprocessor
22 from nominatim.tools.exec_utils import run_osm2pgsql
23 from nominatim.errors import UsageError
24 from nominatim.version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
26 LOG = logging.getLogger()
28 def _require_version(module, actual, expected):
29 """ Compares the version for the given module and raises an exception
30 if the actual version is too old.
33 LOG.fatal('Minimum supported version of %s is %d.%d. '
34 'Found version %d.%d.',
35 module, expected[0], expected[1], actual[0], actual[1])
36 raise UsageError(f'{module} is too old.')
39 def setup_database_skeleton(dsn, rouser=None):
40 """ Create a new database for Nominatim and populate it with the
43 The function fails when the database already exists or Postgresql or
44 PostGIS versions are too old.
46 Uses `createdb` to create the database.
48 If 'rouser' is given, then the function also checks that the user
49 with that given name exists.
51 Requires superuser rights by the caller.
53 proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
55 if proc.returncode != 0:
56 raise UsageError('Creating new database failed.')
58 with connect(dsn) as conn:
59 _require_version('PostgreSQL server',
60 conn.server_version_tuple(),
61 POSTGRESQL_REQUIRED_VERSION)
63 if rouser is not None:
64 with conn.cursor() as cur:
65 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
68 LOG.fatal("Web user '%s' does not exists. Create it with:\n"
69 "\n createuser %s", rouser, rouser)
70 raise UsageError('Missing read-only user.')
73 with conn.cursor() as cur:
74 cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
75 cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
78 _require_version('PostGIS',
79 conn.postgis_version_tuple(),
80 POSTGIS_REQUIRED_VERSION)
83 def import_osm_data(osm_files, options, drop=False, ignore_errors=False):
84 """ Import the given OSM files. 'options' contains the list of
85 default settings for osm2pgsql.
87 options['import_file'] = osm_files
88 options['append'] = False
89 options['threads'] = 1
91 if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
92 # Make some educated guesses about cache size based on the size
93 # of the import file and the available memory.
94 mem = psutil.virtual_memory()
96 if isinstance(osm_files, list):
97 for fname in osm_files:
98 fsize += os.stat(str(fname)).st_size
100 fsize = os.stat(str(osm_files)).st_size
101 options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
102 fsize * 2) / 1024 / 1024) + 1
104 run_osm2pgsql(options)
106 with connect(options['dsn']) as conn:
107 if not ignore_errors:
108 with conn.cursor() as cur:
109 cur.execute('SELECT * FROM place LIMIT 1')
110 if cur.rowcount == 0:
111 raise UsageError('No data imported by osm2pgsql.')
114 conn.drop_table('planet_osm_nodes')
116 if drop and options['flatnode_file']:
117 Path(options['flatnode_file']).unlink()
120 def create_tables(conn, config, reverse_only=False):
121 """ Create the set of basic tables.
122 When `reverse_only` is True, then the main table for searching will
123 be skipped and only reverse search is possible.
125 sql = SQLPreprocessor(conn, config)
126 sql.env.globals['db']['reverse_only'] = reverse_only
128 sql.run_sql_file(conn, 'tables.sql')
131 def create_table_triggers(conn, config):
132 """ Create the triggers for the tables. The trigger functions must already
133 have been imported with refresh.create_functions().
135 sql = SQLPreprocessor(conn, config)
136 sql.run_sql_file(conn, 'table-triggers.sql')
139 def create_partition_tables(conn, config):
140 """ Create tables that have explicit partitioning.
142 sql = SQLPreprocessor(conn, config)
143 sql.run_sql_file(conn, 'partition-tables.src.sql')
146 def truncate_data_tables(conn):
147 """ Truncate all data tables to prepare for a fresh load.
149 with conn.cursor() as cur:
150 cur.execute('TRUNCATE placex')
151 cur.execute('TRUNCATE place_addressline')
152 cur.execute('TRUNCATE location_area')
153 cur.execute('TRUNCATE location_area_country')
154 cur.execute('TRUNCATE location_property_tiger')
155 cur.execute('TRUNCATE location_property_osmline')
156 cur.execute('TRUNCATE location_postcode')
157 if conn.table_exists('search_name'):
158 cur.execute('TRUNCATE search_name')
159 cur.execute('DROP SEQUENCE IF EXISTS seq_place')
160 cur.execute('CREATE SEQUENCE seq_place start 100000')
162 cur.execute("""SELECT tablename FROM pg_tables
163 WHERE tablename LIKE 'location_road_%'""")
165 for table in [r[0] for r in list(cur)]:
166 cur.execute('TRUNCATE ' + table)
171 _COPY_COLUMNS = pysql.SQL(',').join(map(pysql.Identifier,
172 ('osm_type', 'osm_id', 'class', 'type',
173 'name', 'admin_level', 'address',
174 'extratags', 'geometry')))
177 def load_data(dsn, threads):
178 """ Copy data into the word and placex table.
180 sel = selectors.DefaultSelector()
181 # Then copy data from place to placex in <threads - 1> chunks.
182 place_threads = max(1, threads - 1)
183 for imod in range(place_threads):
184 conn = DBConnection(dsn)
187 pysql.SQL("""INSERT INTO placex ({columns})
188 SELECT {columns} FROM place
189 WHERE osm_id % {total} = {mod}
190 AND NOT (class='place' and (type='houses' or type='postcode'))
191 AND ST_IsValid(geometry)
192 """).format(columns=_COPY_COLUMNS,
193 total=pysql.Literal(place_threads),
194 mod=pysql.Literal(imod)))
195 sel.register(conn, selectors.EVENT_READ, conn)
197 # Address interpolations go into another table.
198 conn = DBConnection(dsn)
200 conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
201 SELECT osm_id, address, geometry FROM place
202 WHERE class='place' and type='houses' and osm_type='W'
203 and ST_GeometryType(geometry) = 'ST_LineString'
205 sel.register(conn, selectors.EVENT_READ, conn)
207 # Now wait for all of them to finish.
208 todo = place_threads + 1
210 for key, _ in sel.select(1):
216 print('.', end='', flush=True)
219 with connect(dsn) as conn:
220 with conn.cursor() as cur:
221 cur.execute('ANALYSE')
224 def create_search_indices(conn, config, drop=False):
225 """ Create tables that have explicit partitioning.
228 # If index creation failed and left an index invalid, they need to be
229 # cleaned out first, so that the script recreates them.
230 with conn.cursor() as cur:
231 cur.execute("""SELECT relname FROM pg_class, pg_index
232 WHERE pg_index.indisvalid = false
233 AND pg_index.indexrelid = pg_class.oid""")
234 bad_indices = [row[0] for row in list(cur)]
235 for idx in bad_indices:
236 LOG.info("Drop invalid index %s.", idx)
237 cur.execute('DROP INDEX "{}"'.format(idx))
240 sql = SQLPreprocessor(conn, config)
242 sql.run_sql_file(conn, 'indices.sql', drop=drop)