]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/database_import.py
document new status fields
[nominatim.git] / nominatim / tools / database_import.py
1 """
2 Functions for setting up and importing a new Nominatim database.
3 """
4 import logging
5 import os
6 import selectors
7 import subprocess
8 import shutil
9 from pathlib import Path
10
11 import psutil
12 import psycopg2
13
14 from ..db.connection import connect, get_pg_env
15 from ..db import utils as db_utils
16 from ..db.async_connection import DBConnection
17 from .exec_utils import run_osm2pgsql
18 from ..errors import UsageError
19 from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
20
21 LOG = logging.getLogger()
22
23 def setup_database_skeleton(dsn, data_dir, no_partitions, rouser=None):
24     """ Create a new database for Nominatim and populate it with the
25         essential extensions and data.
26     """
27     LOG.warning('Creating database')
28     create_db(dsn, rouser)
29
30     LOG.warning('Setting up database')
31     with connect(dsn) as conn:
32         setup_extensions(conn)
33
34     LOG.warning('Loading basic data')
35     import_base_data(dsn, data_dir, no_partitions)
36
37
38 def create_db(dsn, rouser=None):
39     """ Create a new database for the given DSN. Fails when the database
40         already exists or the PostgreSQL version is too old.
41         Uses `createdb` to create the database.
42
43         If 'rouser' is given, then the function also checks that the user
44         with that given name exists.
45
46         Requires superuser rights by the caller.
47     """
48     proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
49
50     if proc.returncode != 0:
51         raise UsageError('Creating new database failed.')
52
53     with connect(dsn) as conn:
54         postgres_version = conn.server_version_tuple()
55         if postgres_version < POSTGRESQL_REQUIRED_VERSION:
56             LOG.fatal('Minimum supported version of Postgresql is %d.%d. '
57                       'Found version %d.%d.',
58                       POSTGRESQL_REQUIRED_VERSION[0], POSTGRESQL_REQUIRED_VERSION[1],
59                       postgres_version[0], postgres_version[1])
60             raise UsageError('PostgreSQL server is too old.')
61
62         if rouser is not None:
63             with conn.cursor() as cur:
64                 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
65                                  (rouser, ))
66                 if cnt == 0:
67                     LOG.fatal("Web user '%s' does not exists. Create it with:\n"
68                               "\n      createuser %s", rouser, rouser)
69                     raise UsageError('Missing read-only user.')
70
71
72
73 def setup_extensions(conn):
74     """ Set up all extensions needed for Nominatim. Also checks that the
75         versions of the extensions are sufficient.
76     """
77     with conn.cursor() as cur:
78         cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
79         cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
80     conn.commit()
81
82     postgis_version = conn.postgis_version_tuple()
83     if postgis_version < POSTGIS_REQUIRED_VERSION:
84         LOG.fatal('Minimum supported version of PostGIS is %d.%d. '
85                   'Found version %d.%d.',
86                   POSTGIS_REQUIRED_VERSION[0], POSTGIS_REQUIRED_VERSION[1],
87                   postgis_version[0], postgis_version[1])
88         raise UsageError('PostGIS version is too old.')
89
90
91 def install_module(src_dir, project_dir, module_dir, conn=None):
92     """ Copy the normalization module from src_dir into the project
93         directory under the '/module' directory. If 'module_dir' is set, then
94         use the module from there instead and check that it is accessible
95         for Postgresql.
96
97         The function detects when the installation is run from the
98         build directory. It doesn't touch the module in that case.
99
100         If 'conn' is given, then the function also tests if the module
101         can be access via the given database.
102     """
103     if not module_dir:
104         module_dir = project_dir / 'module'
105
106         if not module_dir.exists() or not src_dir.samefile(module_dir):
107
108             if not module_dir.exists():
109                 module_dir.mkdir()
110
111             destfile = module_dir / 'nominatim.so'
112             shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
113             destfile.chmod(0o755)
114
115             LOG.info('Database module installed at %s', str(destfile))
116         else:
117             LOG.info('Running from build directory. Leaving database module as is.')
118     else:
119         LOG.info("Using custom path for database module at '%s'", module_dir)
120
121     if conn is not None:
122         with conn.cursor() as cur:
123             try:
124                 cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
125                                RETURNS text AS '{}/nominatim.so', 'transliteration'
126                                LANGUAGE c IMMUTABLE STRICT;
127                                DROP FUNCTION nominatim_test_import_func(text)
128                             """.format(module_dir))
129             except psycopg2.DatabaseError as err:
130                 LOG.fatal("Error accessing database module: %s", err)
131                 raise UsageError("Database module cannot be accessed.") from err
132
133
134 def import_base_data(dsn, sql_dir, ignore_partitions=False):
135     """ Create and populate the tables with basic static data that provides
136         the background for geocoding. Data is assumed to not yet exist.
137     """
138     db_utils.execute_file(dsn, sql_dir / 'country_name.sql')
139     db_utils.execute_file(dsn, sql_dir / 'country_osm_grid.sql.gz')
140
141     if ignore_partitions:
142         with connect(dsn) as conn:
143             with conn.cursor() as cur:
144                 cur.execute('UPDATE country_name SET partition = 0')
145             conn.commit()
146
147
148 def import_osm_data(osm_file, options, drop=False, ignore_errors=False):
149     """ Import the given OSM file. 'options' contains the list of
150         default settings for osm2pgsql.
151     """
152     options['import_file'] = osm_file
153     options['append'] = False
154     options['threads'] = 1
155
156     if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
157         # Make some educated guesses about cache size based on the size
158         # of the import file and the available memory.
159         mem = psutil.virtual_memory()
160         fsize = os.stat(str(osm_file)).st_size
161         options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
162                                              fsize * 2) / 1024 / 1024) + 1
163
164     run_osm2pgsql(options)
165
166     with connect(options['dsn']) as conn:
167         if not ignore_errors:
168             with conn.cursor() as cur:
169                 cur.execute('SELECT * FROM place LIMIT 1')
170                 if cur.rowcount == 0:
171                     raise UsageError('No data imported by osm2pgsql.')
172
173         if drop:
174             conn.drop_table('planet_osm_nodes')
175
176     if drop:
177         if options['flatnode_file']:
178             Path(options['flatnode_file']).unlink()
179
180
181 def truncate_data_tables(conn, max_word_frequency=None):
182     """ Truncate all data tables to prepare for a fresh load.
183     """
184     with conn.cursor() as cur:
185         cur.execute('TRUNCATE word')
186         cur.execute('TRUNCATE placex')
187         cur.execute('TRUNCATE place_addressline')
188         cur.execute('TRUNCATE location_area')
189         cur.execute('TRUNCATE location_area_country')
190         cur.execute('TRUNCATE location_property')
191         cur.execute('TRUNCATE location_property_tiger')
192         cur.execute('TRUNCATE location_property_osmline')
193         cur.execute('TRUNCATE location_postcode')
194         if conn.table_exists('search_name'):
195             cur.execute('TRUNCATE search_name')
196         cur.execute('DROP SEQUENCE IF EXISTS seq_place')
197         cur.execute('CREATE SEQUENCE seq_place start 100000')
198
199         cur.execute("""SELECT tablename FROM pg_tables
200                        WHERE tablename LIKE 'location_road_%'""")
201
202         for table in [r[0] for r in list(cur)]:
203             cur.execute('TRUNCATE ' + table)
204
205         if max_word_frequency is not None:
206             # Used by getorcreate_word_id to ignore frequent partial words.
207             cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
208                            RETURNS integer AS $$
209                              SELECT {} as maxwordfreq;
210                            $$ LANGUAGE SQL IMMUTABLE
211                         """.format(max_word_frequency))
212         conn.commit()
213
214 _COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
215
216 def load_data(dsn, data_dir, threads):
217     """ Copy data into the word and placex table.
218     """
219     # Pre-calculate the most important terms in the word list.
220     db_utils.execute_file(dsn, data_dir / 'words.sql')
221
222     sel = selectors.DefaultSelector()
223     # Then copy data from place to placex in <threads - 1> chunks.
224     place_threads = max(1, threads - 1)
225     for imod in range(place_threads):
226         conn = DBConnection(dsn)
227         conn.connect()
228         conn.perform("""INSERT INTO placex ({0})
229                          SELECT {0} FROM place
230                          WHERE osm_id % {1} = {2}
231                            AND NOT (class='place' and type='houses')
232                            AND ST_IsValid(geometry)
233                      """.format(_COPY_COLUMNS, place_threads, imod))
234         sel.register(conn, selectors.EVENT_READ, conn)
235
236     # Address interpolations go into another table.
237     conn = DBConnection(dsn)
238     conn.connect()
239     conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
240                       SELECT osm_id, address, geometry FROM place
241                       WHERE class='place' and type='houses' and osm_type='W'
242                             and ST_GeometryType(geometry) = 'ST_LineString'
243                  """)
244     sel.register(conn, selectors.EVENT_READ, conn)
245
246     # Now wait for all of them to finish.
247     todo = place_threads + 1
248     while todo > 0:
249         for key, _ in sel.select(1):
250             conn = key.data
251             sel.unregister(conn)
252             conn.wait()
253             conn.close()
254             todo -= 1
255         print('.', end='', flush=True)
256     print('\n')
257
258     with connect(dsn) as conn:
259         with conn.cursor() as cur:
260             cur.execute('ANALYSE')