2 Functions for setting up and importing a new Nominatim database.
9 from pathlib import Path
14 from ..db.connection import connect, get_pg_env
15 from ..db import utils as db_utils
16 from ..db.async_connection import DBConnection
17 from ..db.sql_preprocessor import SQLPreprocessor
18 from .exec_utils import run_osm2pgsql
19 from ..errors import UsageError
20 from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
22 LOG = logging.getLogger()
24 def setup_database_skeleton(dsn, data_dir, no_partitions, rouser=None):
25 """ Create a new database for Nominatim and populate it with the
26 essential extensions and data.
28 LOG.warning('Creating database')
29 create_db(dsn, rouser)
31 LOG.warning('Setting up database')
32 with connect(dsn) as conn:
33 setup_extensions(conn)
35 LOG.warning('Loading basic data')
36 import_base_data(dsn, data_dir, no_partitions)
39 def create_db(dsn, rouser=None):
40 """ Create a new database for the given DSN. Fails when the database
41 already exists or the PostgreSQL version is too old.
42 Uses `createdb` to create the database.
44 If 'rouser' is given, then the function also checks that the user
45 with that given name exists.
47 Requires superuser rights by the caller.
49 proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
51 if proc.returncode != 0:
52 raise UsageError('Creating new database failed.')
54 with connect(dsn) as conn:
55 postgres_version = conn.server_version_tuple()
56 if postgres_version < POSTGRESQL_REQUIRED_VERSION:
57 LOG.fatal('Minimum supported version of Postgresql is %d.%d. '
58 'Found version %d.%d.',
59 POSTGRESQL_REQUIRED_VERSION[0], POSTGRESQL_REQUIRED_VERSION[1],
60 postgres_version[0], postgres_version[1])
61 raise UsageError('PostgreSQL server is too old.')
63 if rouser is not None:
64 with conn.cursor() as cur:
65 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
68 LOG.fatal("Web user '%s' does not exists. Create it with:\n"
69 "\n createuser %s", rouser, rouser)
70 raise UsageError('Missing read-only user.')
74 def setup_extensions(conn):
75 """ Set up all extensions needed for Nominatim. Also checks that the
76 versions of the extensions are sufficient.
78 with conn.cursor() as cur:
79 cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
80 cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
83 postgis_version = conn.postgis_version_tuple()
84 if postgis_version < POSTGIS_REQUIRED_VERSION:
85 LOG.fatal('Minimum supported version of PostGIS is %d.%d. '
86 'Found version %d.%d.',
87 POSTGIS_REQUIRED_VERSION[0], POSTGIS_REQUIRED_VERSION[1],
88 postgis_version[0], postgis_version[1])
89 raise UsageError('PostGIS version is too old.')
92 def install_module(src_dir, project_dir, module_dir, conn=None):
93 """ Copy the normalization module from src_dir into the project
94 directory under the '/module' directory. If 'module_dir' is set, then
95 use the module from there instead and check that it is accessible
98 The function detects when the installation is run from the
99 build directory. It doesn't touch the module in that case.
101 If 'conn' is given, then the function also tests if the module
102 can be access via the given database.
105 module_dir = project_dir / 'module'
107 if not module_dir.exists() or not src_dir.samefile(module_dir):
109 if not module_dir.exists():
112 destfile = module_dir / 'nominatim.so'
113 shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
114 destfile.chmod(0o755)
116 LOG.info('Database module installed at %s', str(destfile))
118 LOG.info('Running from build directory. Leaving database module as is.')
120 LOG.info("Using custom path for database module at '%s'", module_dir)
123 with conn.cursor() as cur:
125 cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
126 RETURNS text AS '{}/nominatim.so', 'transliteration'
127 LANGUAGE c IMMUTABLE STRICT;
128 DROP FUNCTION nominatim_test_import_func(text)
129 """.format(module_dir))
130 except psycopg2.DatabaseError as err:
131 LOG.fatal("Error accessing database module: %s", err)
132 raise UsageError("Database module cannot be accessed.") from err
135 def import_base_data(dsn, sql_dir, ignore_partitions=False):
136 """ Create and populate the tables with basic static data that provides
137 the background for geocoding. Data is assumed to not yet exist.
139 db_utils.execute_file(dsn, sql_dir / 'country_name.sql')
140 db_utils.execute_file(dsn, sql_dir / 'country_osm_grid.sql.gz')
142 if ignore_partitions:
143 with connect(dsn) as conn:
144 with conn.cursor() as cur:
145 cur.execute('UPDATE country_name SET partition = 0')
149 def import_osm_data(osm_file, options, drop=False, ignore_errors=False):
150 """ Import the given OSM file. 'options' contains the list of
151 default settings for osm2pgsql.
153 options['import_file'] = osm_file
154 options['append'] = False
155 options['threads'] = 1
157 if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
158 # Make some educated guesses about cache size based on the size
159 # of the import file and the available memory.
160 mem = psutil.virtual_memory()
161 fsize = os.stat(str(osm_file)).st_size
162 options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
163 fsize * 2) / 1024 / 1024) + 1
165 run_osm2pgsql(options)
167 with connect(options['dsn']) as conn:
168 if not ignore_errors:
169 with conn.cursor() as cur:
170 cur.execute('SELECT * FROM place LIMIT 1')
171 if cur.rowcount == 0:
172 raise UsageError('No data imported by osm2pgsql.')
175 conn.drop_table('planet_osm_nodes')
178 if options['flatnode_file']:
179 Path(options['flatnode_file']).unlink()
182 def create_tables(conn, config, sqllib_dir, reverse_only=False):
183 """ Create the set of basic tables.
184 When `reverse_only` is True, then the main table for searching will
185 be skipped and only reverse search is possible.
187 sql = SQLPreprocessor(conn, config, sqllib_dir)
188 sql.env.globals['db']['reverse_only'] = reverse_only
190 sql.run_sql_file(conn, 'tables.sql')
193 def create_table_triggers(conn, config, sqllib_dir):
194 """ Create the triggers for the tables. The trigger functions must already
195 have been imported with refresh.create_functions().
197 sql = SQLPreprocessor(conn, config, sqllib_dir)
198 sql.run_sql_file(conn, 'table-triggers.sql')
201 def create_partition_tables(conn, config, sqllib_dir):
202 """ Create tables that have explicit partitioning.
204 sql = SQLPreprocessor(conn, config, sqllib_dir)
205 sql.run_sql_file(conn, 'partition-tables.src.sql')
208 def truncate_data_tables(conn, max_word_frequency=None):
209 """ Truncate all data tables to prepare for a fresh load.
211 with conn.cursor() as cur:
212 cur.execute('TRUNCATE word')
213 cur.execute('TRUNCATE placex')
214 cur.execute('TRUNCATE place_addressline')
215 cur.execute('TRUNCATE location_area')
216 cur.execute('TRUNCATE location_area_country')
217 cur.execute('TRUNCATE location_property')
218 cur.execute('TRUNCATE location_property_tiger')
219 cur.execute('TRUNCATE location_property_osmline')
220 cur.execute('TRUNCATE location_postcode')
221 if conn.table_exists('search_name'):
222 cur.execute('TRUNCATE search_name')
223 cur.execute('DROP SEQUENCE IF EXISTS seq_place')
224 cur.execute('CREATE SEQUENCE seq_place start 100000')
226 cur.execute("""SELECT tablename FROM pg_tables
227 WHERE tablename LIKE 'location_road_%'""")
229 for table in [r[0] for r in list(cur)]:
230 cur.execute('TRUNCATE ' + table)
232 if max_word_frequency is not None:
233 # Used by getorcreate_word_id to ignore frequent partial words.
234 cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
235 RETURNS integer AS $$
236 SELECT {} as maxwordfreq;
237 $$ LANGUAGE SQL IMMUTABLE
238 """.format(max_word_frequency))
241 _COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
243 def load_data(dsn, data_dir, threads):
244 """ Copy data into the word and placex table.
246 # Pre-calculate the most important terms in the word list.
247 db_utils.execute_file(dsn, data_dir / 'words.sql')
249 sel = selectors.DefaultSelector()
250 # Then copy data from place to placex in <threads - 1> chunks.
251 place_threads = max(1, threads - 1)
252 for imod in range(place_threads):
253 conn = DBConnection(dsn)
255 conn.perform("""INSERT INTO placex ({0})
256 SELECT {0} FROM place
257 WHERE osm_id % {1} = {2}
258 AND NOT (class='place' and type='houses')
259 AND ST_IsValid(geometry)
260 """.format(_COPY_COLUMNS, place_threads, imod))
261 sel.register(conn, selectors.EVENT_READ, conn)
263 # Address interpolations go into another table.
264 conn = DBConnection(dsn)
266 conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
267 SELECT osm_id, address, geometry FROM place
268 WHERE class='place' and type='houses' and osm_type='W'
269 and ST_GeometryType(geometry) = 'ST_LineString'
271 sel.register(conn, selectors.EVENT_READ, conn)
273 # Now wait for all of them to finish.
274 todo = place_threads + 1
276 for key, _ in sel.select(1):
282 print('.', end='', flush=True)
285 with connect(dsn) as conn:
286 with conn.cursor() as cur:
287 cur.execute('ANALYSE')
290 def create_search_indices(conn, config, sqllib_dir, drop=False):
291 """ Create tables that have explicit partitioning.
294 # If index creation failed and left an index invalid, they need to be
295 # cleaned out first, so that the script recreates them.
296 with conn.cursor() as cur:
297 cur.execute("""SELECT relname FROM pg_class, pg_index
298 WHERE pg_index.indisvalid = false
299 AND pg_index.indexrelid = pg_class.oid""")
300 bad_indices = [row[0] for row in list(cur)]
301 for idx in bad_indices:
302 LOG.info("Drop invalid index %s.", idx)
303 cur.execute('DROP INDEX "{}"'.format(idx))
306 sql = SQLPreprocessor(conn, config, sqllib_dir)
308 sql.run_sql_file(conn, 'indices.sql', drop=drop)