2 Functions for setting up and importing a new Nominatim database.
9 from pathlib import Path
14 from ..db.connection import connect, get_pg_env
15 from ..db import utils as db_utils
16 from ..db.async_connection import DBConnection
17 from .exec_utils import run_osm2pgsql
18 from ..errors import UsageError
19 from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
21 LOG = logging.getLogger()
23 def setup_database_skeleton(dsn, data_dir, no_partitions, rouser=None):
24 """ Create a new database for Nominatim and populate it with the
25 essential extensions and data.
27 LOG.warning('Creating database')
28 create_db(dsn, rouser)
30 LOG.warning('Setting up database')
31 with connect(dsn) as conn:
32 setup_extensions(conn)
34 LOG.warning('Loading basic data')
35 import_base_data(dsn, data_dir, no_partitions)
38 def create_db(dsn, rouser=None):
39 """ Create a new database for the given DSN. Fails when the database
40 already exists or the PostgreSQL version is too old.
41 Uses `createdb` to create the database.
43 If 'rouser' is given, then the function also checks that the user
44 with that given name exists.
46 Requires superuser rights by the caller.
48 proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
50 if proc.returncode != 0:
51 raise UsageError('Creating new database failed.')
53 with connect(dsn) as conn:
54 postgres_version = conn.server_version_tuple()
55 if postgres_version < POSTGRESQL_REQUIRED_VERSION:
56 LOG.fatal('Minimum supported version of Postgresql is %d.%d. '
57 'Found version %d.%d.',
58 POSTGRESQL_REQUIRED_VERSION[0], POSTGRESQL_REQUIRED_VERSION[1],
59 postgres_version[0], postgres_version[1])
60 raise UsageError('PostgreSQL server is too old.')
62 if rouser is not None:
63 with conn.cursor() as cur:
64 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
67 LOG.fatal("Web user '%s' does not exists. Create it with:\n"
68 "\n createuser %s", rouser, rouser)
69 raise UsageError('Missing read-only user.')
73 def setup_extensions(conn):
74 """ Set up all extensions needed for Nominatim. Also checks that the
75 versions of the extensions are sufficient.
77 with conn.cursor() as cur:
78 cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
79 cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
82 postgis_version = conn.postgis_version_tuple()
83 if postgis_version < POSTGIS_REQUIRED_VERSION:
84 LOG.fatal('Minimum supported version of PostGIS is %d.%d. '
85 'Found version %d.%d.',
86 POSTGIS_REQUIRED_VERSION[0], POSTGIS_REQUIRED_VERSION[1],
87 postgis_version[0], postgis_version[1])
88 raise UsageError('PostGIS version is too old.')
91 def install_module(src_dir, project_dir, module_dir, conn=None):
92 """ Copy the normalization module from src_dir into the project
93 directory under the '/module' directory. If 'module_dir' is set, then
94 use the module from there instead and check that it is accessible
97 The function detects when the installation is run from the
98 build directory. It doesn't touch the module in that case.
100 If 'conn' is given, then the function also tests if the module
101 can be access via the given database.
104 module_dir = project_dir / 'module'
106 if not module_dir.exists() or not src_dir.samefile(module_dir):
108 if not module_dir.exists():
111 destfile = module_dir / 'nominatim.so'
112 shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
113 destfile.chmod(0o755)
115 LOG.info('Database module installed at %s', str(destfile))
117 LOG.info('Running from build directory. Leaving database module as is.')
119 LOG.info("Using custom path for database module at '%s'", module_dir)
122 with conn.cursor() as cur:
124 cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
125 RETURNS text AS '{}/nominatim.so', 'transliteration'
126 LANGUAGE c IMMUTABLE STRICT;
127 DROP FUNCTION nominatim_test_import_func(text)
128 """.format(module_dir))
129 except psycopg2.DatabaseError as err:
130 LOG.fatal("Error accessing database module: %s", err)
131 raise UsageError("Database module cannot be accessed.") from err
134 def import_base_data(dsn, sql_dir, ignore_partitions=False):
135 """ Create and populate the tables with basic static data that provides
136 the background for geocoding. Data is assumed to not yet exist.
138 db_utils.execute_file(dsn, sql_dir / 'country_name.sql')
139 db_utils.execute_file(dsn, sql_dir / 'country_osm_grid.sql.gz')
141 if ignore_partitions:
142 with connect(dsn) as conn:
143 with conn.cursor() as cur:
144 cur.execute('UPDATE country_name SET partition = 0')
148 def import_osm_data(osm_file, options, drop=False, ignore_errors=False):
149 """ Import the given OSM file. 'options' contains the list of
150 default settings for osm2pgsql.
152 options['import_file'] = osm_file
153 options['append'] = False
154 options['threads'] = 1
156 if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
157 # Make some educated guesses about cache size based on the size
158 # of the import file and the available memory.
159 mem = psutil.virtual_memory()
160 fsize = os.stat(str(osm_file)).st_size
161 options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
162 fsize * 2) / 1024 / 1024) + 1
164 run_osm2pgsql(options)
166 with connect(options['dsn']) as conn:
167 if not ignore_errors:
168 with conn.cursor() as cur:
169 cur.execute('SELECT * FROM place LIMIT 1')
170 if cur.rowcount == 0:
171 raise UsageError('No data imported by osm2pgsql.')
174 conn.drop_table('planet_osm_nodes')
177 if options['flatnode_file']:
178 Path(options['flatnode_file']).unlink()
181 def truncate_data_tables(conn, max_word_frequency=None):
182 """ Truncate all data tables to prepare for a fresh load.
184 with conn.cursor() as cur:
185 cur.execute('TRUNCATE word')
186 cur.execute('TRUNCATE placex')
187 cur.execute('TRUNCATE place_addressline')
188 cur.execute('TRUNCATE location_area')
189 cur.execute('TRUNCATE location_area_country')
190 cur.execute('TRUNCATE location_property')
191 cur.execute('TRUNCATE location_property_tiger')
192 cur.execute('TRUNCATE location_property_osmline')
193 cur.execute('TRUNCATE location_postcode')
194 cur.execute('TRUNCATE search_name')
195 cur.execute('DROP SEQUENCE IF EXISTS seq_place')
196 cur.execute('CREATE SEQUENCE seq_place start 100000')
198 cur.execute("""SELECT tablename FROM pg_tables
199 WHERE tablename LIKE 'location_road_%'""")
201 for table in [r[0] for r in list(cur)]:
202 cur.execute('TRUNCATE ' + table)
204 if max_word_frequency is not None:
205 # Used by getorcreate_word_id to ignore frequent partial words.
206 cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
207 RETURNS integer AS $$
208 SELECT {} as maxwordfreq;
209 $$ LANGUAGE SQL IMMUTABLE
210 """.format(max_word_frequency))
213 _COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
215 def load_data(dsn, data_dir, threads):
216 """ Copy data into the word and placex table.
218 # Pre-calculate the most important terms in the word list.
219 db_utils.execute_file(dsn, data_dir / 'words.sql')
221 sel = selectors.DefaultSelector()
222 # Then copy data from place to placex in <threads - 1> chunks.
223 place_threads = max(1, threads - 1)
224 for imod in range(place_threads):
225 conn = DBConnection(dsn)
227 conn.perform("""INSERT INTO placex ({0})
228 SELECT {0} FROM place
229 WHERE osm_id % {1} = {2}
230 AND NOT (class='place' and type='houses')
231 AND ST_IsValid(geometry)
232 """.format(_COPY_COLUMNS, place_threads, imod))
233 sel.register(conn, selectors.EVENT_READ, conn)
235 # Address interpolations go into another table.
236 conn = DBConnection(dsn)
238 conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
239 SELECT osm_id, address, geometry FROM place
240 WHERE class='place' and type='houses' and osm_type='W'
241 and ST_GeometryType(geometry) = 'ST_LineString'
243 sel.register(conn, selectors.EVENT_READ, conn)
245 # Now wait for all of them to finish.
246 todo = place_threads + 1
248 for key, _ in sel.select(1):
254 print('.', end='', flush=True)
257 with connect(dsn) as conn:
258 with conn.cursor() as cur:
259 cur.execute('ANALYSE')