]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/database_import.py
8a83ad0cd710db322d8615f5dbd668d1c02e2bf1
[nominatim.git] / nominatim / tools / database_import.py
1 """
2 Functions for setting up and importing a new Nominatim database.
3 """
4 import logging
5 import os
6 import selectors
7 import subprocess
8 from pathlib import Path
9
10 import psutil
11 import psycopg2.extras
12 from psycopg2 import sql as pysql
13
14 from nominatim.db.connection import connect, get_pg_env
15 from nominatim.db.async_connection import DBConnection
16 from nominatim.db.sql_preprocessor import SQLPreprocessor
17 from nominatim.tools.exec_utils import run_osm2pgsql
18 from nominatim.errors import UsageError
19 from nominatim.version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
20
21 LOG = logging.getLogger()
22
23 def _require_version(module, actual, expected):
24     """ Compares the version for the given module and raises an exception
25         if the actual version is too old.
26     """
27     if actual < expected:
28         LOG.fatal('Minimum supported version of %s is %d.%d. '
29                   'Found version %d.%d.',
30                   module, expected[0], expected[1], actual[0], actual[1])
31         raise UsageError(f'{module} is too old.')
32
33
34 def setup_database_skeleton(dsn, rouser=None):
35     """ Create a new database for Nominatim and populate it with the
36         essential extensions.
37
38         The function fails when the database already exists or Postgresql or
39         PostGIS versions are too old.
40
41         Uses `createdb` to create the database.
42
43         If 'rouser' is given, then the function also checks that the user
44         with that given name exists.
45
46         Requires superuser rights by the caller.
47     """
48     proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
49
50     if proc.returncode != 0:
51         raise UsageError('Creating new database failed.')
52
53     with connect(dsn) as conn:
54         _require_version('PostgreSQL server',
55                          conn.server_version_tuple(),
56                          POSTGRESQL_REQUIRED_VERSION)
57
58         if rouser is not None:
59             with conn.cursor() as cur:
60                 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
61                                  (rouser, ))
62                 if cnt == 0:
63                     LOG.fatal("Web user '%s' does not exists. Create it with:\n"
64                               "\n      createuser %s", rouser, rouser)
65                     raise UsageError('Missing read-only user.')
66
67         # Create extensions.
68         with conn.cursor() as cur:
69             cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
70             cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
71         conn.commit()
72
73         _require_version('PostGIS',
74                          conn.postgis_version_tuple(),
75                          POSTGIS_REQUIRED_VERSION)
76
77
78 def import_osm_data(osm_files, options, drop=False, ignore_errors=False):
79     """ Import the given OSM files. 'options' contains the list of
80         default settings for osm2pgsql.
81     """
82     options['import_file'] = osm_files
83     options['append'] = False
84     options['threads'] = 1
85
86     if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
87         # Make some educated guesses about cache size based on the size
88         # of the import file and the available memory.
89         mem = psutil.virtual_memory()
90         fsize = 0
91         if isinstance(osm_files, list):
92             for fname in osm_files:
93                 fsize += os.stat(str(fname)).st_size
94         else:
95             fsize = os.stat(str(osm_files)).st_size
96         options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
97                                              fsize * 2) / 1024 / 1024) + 1
98
99     run_osm2pgsql(options)
100
101     with connect(options['dsn']) as conn:
102         if not ignore_errors:
103             with conn.cursor() as cur:
104                 cur.execute('SELECT * FROM place LIMIT 1')
105                 if cur.rowcount == 0:
106                     raise UsageError('No data imported by osm2pgsql.')
107
108         if drop:
109             conn.drop_table('planet_osm_nodes')
110
111     if drop and options['flatnode_file']:
112         Path(options['flatnode_file']).unlink()
113
114
115 def create_tables(conn, config, reverse_only=False):
116     """ Create the set of basic tables.
117         When `reverse_only` is True, then the main table for searching will
118         be skipped and only reverse search is possible.
119     """
120     sql = SQLPreprocessor(conn, config)
121     sql.env.globals['db']['reverse_only'] = reverse_only
122
123     sql.run_sql_file(conn, 'tables.sql')
124
125
126 def create_table_triggers(conn, config):
127     """ Create the triggers for the tables. The trigger functions must already
128         have been imported with refresh.create_functions().
129     """
130     sql = SQLPreprocessor(conn, config)
131     sql.run_sql_file(conn, 'table-triggers.sql')
132
133
134 def create_partition_tables(conn, config):
135     """ Create tables that have explicit partitioning.
136     """
137     sql = SQLPreprocessor(conn, config)
138     sql.run_sql_file(conn, 'partition-tables.src.sql')
139
140
141 def truncate_data_tables(conn):
142     """ Truncate all data tables to prepare for a fresh load.
143     """
144     with conn.cursor() as cur:
145         cur.execute('TRUNCATE placex')
146         cur.execute('TRUNCATE place_addressline')
147         cur.execute('TRUNCATE location_area')
148         cur.execute('TRUNCATE location_area_country')
149         cur.execute('TRUNCATE location_property_tiger')
150         cur.execute('TRUNCATE location_property_osmline')
151         cur.execute('TRUNCATE location_postcode')
152         if conn.table_exists('search_name'):
153             cur.execute('TRUNCATE search_name')
154         cur.execute('DROP SEQUENCE IF EXISTS seq_place')
155         cur.execute('CREATE SEQUENCE seq_place start 100000')
156
157         cur.execute("""SELECT tablename FROM pg_tables
158                        WHERE tablename LIKE 'location_road_%'""")
159
160         for table in [r[0] for r in list(cur)]:
161             cur.execute('TRUNCATE ' + table)
162
163     conn.commit()
164
165
166 _COPY_COLUMNS = pysql.SQL(',').join(map(pysql.Identifier,
167                                         ('osm_type', 'osm_id', 'class', 'type',
168                                          'name', 'admin_level', 'address',
169                                          'extratags', 'geometry')))
170
171
172 def load_data(dsn, threads):
173     """ Copy data into the word and placex table.
174     """
175     sel = selectors.DefaultSelector()
176     # Then copy data from place to placex in <threads - 1> chunks.
177     place_threads = max(1, threads - 1)
178     for imod in range(place_threads):
179         conn = DBConnection(dsn)
180         conn.connect()
181         conn.perform(
182             pysql.SQL("""INSERT INTO placex ({columns})
183                            SELECT {columns} FROM place
184                            WHERE osm_id % {total} = {mod}
185                              AND NOT (class='place' and (type='houses' or type='postcode'))
186                              AND ST_IsValid(geometry)
187                       """).format(columns=_COPY_COLUMNS,
188                                   total=pysql.Literal(place_threads),
189                                   mod=pysql.Literal(imod)))
190         sel.register(conn, selectors.EVENT_READ, conn)
191
192     # Address interpolations go into another table.
193     conn = DBConnection(dsn)
194     conn.connect()
195     conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
196                       SELECT osm_id, address, geometry FROM place
197                       WHERE class='place' and type='houses' and osm_type='W'
198                             and ST_GeometryType(geometry) = 'ST_LineString'
199                  """)
200     sel.register(conn, selectors.EVENT_READ, conn)
201
202     # Now wait for all of them to finish.
203     todo = place_threads + 1
204     while todo > 0:
205         for key, _ in sel.select(1):
206             conn = key.data
207             sel.unregister(conn)
208             conn.wait()
209             conn.close()
210             todo -= 1
211         print('.', end='', flush=True)
212     print('\n')
213
214     with connect(dsn) as conn:
215         with conn.cursor() as cur:
216             cur.execute('ANALYSE')
217
218
219 def create_search_indices(conn, config, drop=False):
220     """ Create tables that have explicit partitioning.
221     """
222
223     # If index creation failed and left an index invalid, they need to be
224     # cleaned out first, so that the script recreates them.
225     with conn.cursor() as cur:
226         cur.execute("""SELECT relname FROM pg_class, pg_index
227                        WHERE pg_index.indisvalid = false
228                              AND pg_index.indexrelid = pg_class.oid""")
229         bad_indices = [row[0] for row in list(cur)]
230         for idx in bad_indices:
231             LOG.info("Drop invalid index %s.", idx)
232             cur.execute('DROP INDEX "{}"'.format(idx))
233     conn.commit()
234
235     sql = SQLPreprocessor(conn, config)
236
237     sql.run_sql_file(conn, 'indices.sql', drop=drop)
238
239
240 def create_country_names(conn, tokenizer, languages=None):
241     """ Add default country names to search index. `languages` is a comma-
242         separated list of language codes as used in OSM. If `languages` is not
243         empty then only name translations for the given languages are added
244         to the index.
245     """
246     if languages:
247         languages = languages.split(',')
248
249     def _include_key(key):
250         return key == 'name' or \
251                (key.startswith('name:') and (not languages or key[5:] in languages))
252
253     with conn.cursor() as cur:
254         psycopg2.extras.register_hstore(cur)
255         cur.execute("""SELECT country_code, name FROM country_name
256                        WHERE country_code is not null""")
257
258         with tokenizer.name_analyzer() as analyzer:
259             for code, name in cur:
260                 names = {'countrycode': code}
261                 if code == 'gb':
262                     names['short_name'] = 'UK'
263                 if code == 'us':
264                     names['short_name'] = 'United States'
265
266                 # country names (only in languages as provided)
267                 if name:
268                     names.update(((k, v) for k, v in name.items() if _include_key(k)))
269
270                 analyzer.add_country_names(code, names)
271
272     conn.commit()