2 Functions for setting up and importing a new Nominatim database.
9 from pathlib import Path
14 from ..db.connection import connect, get_pg_env
15 from ..db import utils as db_utils
16 from ..db.async_connection import DBConnection
17 from .exec_utils import run_osm2pgsql
18 from ..errors import UsageError
19 from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
21 LOG = logging.getLogger()
23 def setup_database_skeleton(dsn, data_dir, no_partitions, rouser=None):
24 """ Create a new database for Nominatim and populate it with the
25 essential extensions and data.
27 LOG.warning('Creating database')
28 create_db(dsn, rouser)
30 LOG.warning('Setting up database')
31 with connect(dsn) as conn:
32 setup_extensions(conn)
34 LOG.warning('Loading basic data')
35 import_base_data(dsn, data_dir, no_partitions)
38 def create_db(dsn, rouser=None):
39 """ Create a new database for the given DSN. Fails when the database
40 already exists or the PostgreSQL version is too old.
41 Uses `createdb` to create the database.
43 If 'rouser' is given, then the function also checks that the user
44 with that given name exists.
46 Requires superuser rights by the caller.
48 proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
50 if proc.returncode != 0:
51 raise UsageError('Creating new database failed.')
53 with connect(dsn) as conn:
54 postgres_version = conn.server_version_tuple()
55 if postgres_version < POSTGRESQL_REQUIRED_VERSION:
56 LOG.fatal('Minimum supported version of Postgresql is %d.%d. '
57 'Found version %d.%d.',
58 POSTGRESQL_REQUIRED_VERSION[0], POSTGRESQL_REQUIRED_VERSION[1],
59 postgres_version[0], postgres_version[1])
60 raise UsageError('PostgreSQL server is too old.')
62 if rouser is not None:
63 with conn.cursor() as cur:
64 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
67 LOG.fatal("Web user '%s' does not exists. Create it with:\n"
68 "\n createuser %s", rouser, rouser)
69 raise UsageError('Missing read-only user.')
73 def setup_extensions(conn):
74 """ Set up all extensions needed for Nominatim. Also checks that the
75 versions of the extensions are sufficient.
77 with conn.cursor() as cur:
78 cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
79 cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
82 postgis_version = conn.postgis_version_tuple()
83 if postgis_version < POSTGIS_REQUIRED_VERSION:
84 LOG.fatal('Minimum supported version of PostGIS is %d.%d. '
85 'Found version %d.%d.',
86 POSTGIS_REQUIRED_VERSION[0], POSTGIS_REQUIRED_VERSION[1],
87 postgis_version[0], postgis_version[1])
88 raise UsageError('PostGIS version is too old.')
91 def install_module(src_dir, project_dir, module_dir, conn=None):
92 """ Copy the normalization module from src_dir into the project
93 directory under the '/module' directory. If 'module_dir' is set, then
94 use the module from there instead and check that it is accessible
97 The function detects when the installation is run from the
98 build directory. It doesn't touch the module in that case.
100 If 'conn' is given, then the function also tests if the module
101 can be access via the given database.
104 module_dir = project_dir / 'module'
106 if not module_dir.exists() or not src_dir.samefile(module_dir):
108 if not module_dir.exists():
111 destfile = module_dir / 'nominatim.so'
112 shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
113 destfile.chmod(0o755)
115 LOG.info('Database module installed at %s', str(destfile))
117 LOG.info('Running from build directory. Leaving database module as is.')
119 LOG.info("Using custom path for database module at '%s'", module_dir)
122 with conn.cursor() as cur:
124 cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
125 RETURNS text AS '{}/nominatim.so', 'transliteration'
126 LANGUAGE c IMMUTABLE STRICT;
127 DROP FUNCTION nominatim_test_import_func(text)
128 """.format(module_dir))
129 except psycopg2.DatabaseError as err:
130 LOG.fatal("Error accessing database module: %s", err)
131 raise UsageError("Database module cannot be accessed.") from err
134 def import_base_data(dsn, sql_dir, ignore_partitions=False):
135 """ Create and populate the tables with basic static data that provides
136 the background for geocoding. Data is assumed to not yet exist.
138 db_utils.execute_file(dsn, sql_dir / 'country_name.sql')
139 db_utils.execute_file(dsn, sql_dir / 'country_osm_grid.sql.gz')
141 if ignore_partitions:
142 with connect(dsn) as conn:
143 with conn.cursor() as cur:
144 cur.execute('UPDATE country_name SET partition = 0')
148 def import_osm_data(osm_file, options, drop=False, ignore_errors=False):
149 """ Import the given OSM file. 'options' contains the list of
150 default settings for osm2pgsql.
152 options['import_file'] = osm_file
153 options['append'] = False
154 options['threads'] = 1
156 if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
157 # Make some educated guesses about cache size based on the size
158 # of the import file and the available memory.
159 mem = psutil.virtual_memory()
160 fsize = os.stat(str(osm_file)).st_size
161 options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
162 fsize * 2) / 1024 / 1024) + 1
164 run_osm2pgsql(options)
166 with connect(options['dsn']) as conn:
167 if not ignore_errors:
168 with conn.cursor() as cur:
169 cur.execute('SELECT * FROM place LIMIT 1')
170 if cur.rowcount == 0:
171 raise UsageError('No data imported by osm2pgsql.')
174 conn.drop_table('planet_osm_nodes')
177 if options['flatnode_file']:
178 Path(options['flatnode_file']).unlink()
181 def truncate_data_tables(conn, max_word_frequency=None):
182 """ Truncate all data tables to prepare for a fresh load.
184 with conn.cursor() as cur:
185 cur.execute('TRUNCATE word')
186 cur.execute('TRUNCATE placex')
187 cur.execute('TRUNCATE place_addressline')
188 cur.execute('TRUNCATE location_area')
189 cur.execute('TRUNCATE location_area_country')
190 cur.execute('TRUNCATE location_property')
191 cur.execute('TRUNCATE location_property_tiger')
192 cur.execute('TRUNCATE location_property_osmline')
193 cur.execute('TRUNCATE location_postcode')
194 if conn.table_exists('search_name'):
195 cur.execute('TRUNCATE search_name')
196 cur.execute('DROP SEQUENCE IF EXISTS seq_place')
197 cur.execute('CREATE SEQUENCE seq_place start 100000')
199 cur.execute("""SELECT tablename FROM pg_tables
200 WHERE tablename LIKE 'location_road_%'""")
202 for table in [r[0] for r in list(cur)]:
203 cur.execute('TRUNCATE ' + table)
205 if max_word_frequency is not None:
206 # Used by getorcreate_word_id to ignore frequent partial words.
207 cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
208 RETURNS integer AS $$
209 SELECT {} as maxwordfreq;
210 $$ LANGUAGE SQL IMMUTABLE
211 """.format(max_word_frequency))
214 _COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
216 def load_data(dsn, data_dir, threads):
217 """ Copy data into the word and placex table.
219 # Pre-calculate the most important terms in the word list.
220 db_utils.execute_file(dsn, data_dir / 'words.sql')
222 sel = selectors.DefaultSelector()
223 # Then copy data from place to placex in <threads - 1> chunks.
224 place_threads = max(1, threads - 1)
225 for imod in range(place_threads):
226 conn = DBConnection(dsn)
228 conn.perform("""INSERT INTO placex ({0})
229 SELECT {0} FROM place
230 WHERE osm_id % {1} = {2}
231 AND NOT (class='place' and type='houses')
232 AND ST_IsValid(geometry)
233 """.format(_COPY_COLUMNS, place_threads, imod))
234 sel.register(conn, selectors.EVENT_READ, conn)
236 # Address interpolations go into another table.
237 conn = DBConnection(dsn)
239 conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
240 SELECT osm_id, address, geometry FROM place
241 WHERE class='place' and type='houses' and osm_type='W'
242 and ST_GeometryType(geometry) = 'ST_LineString'
244 sel.register(conn, selectors.EVENT_READ, conn)
246 # Now wait for all of them to finish.
247 todo = place_threads + 1
249 for key, _ in sel.select(1):
255 print('.', end='', flush=True)
258 with connect(dsn) as conn:
259 with conn.cursor() as cur:
260 cur.execute('ANALYSE')