2 Functions for setting up and importing a new Nominatim database.
8 from pathlib import Path
11 import psycopg2.extras
12 from psycopg2 import sql as pysql
14 from nominatim.db.connection import connect, get_pg_env
15 from nominatim.db.async_connection import DBConnection
16 from nominatim.db.sql_preprocessor import SQLPreprocessor
17 from nominatim.tools.exec_utils import run_osm2pgsql
18 from nominatim.errors import UsageError
19 from nominatim.version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
21 LOG = logging.getLogger()
23 def _require_version(module, actual, expected):
24 """ Compares the version for the given module and raises an exception
25 if the actual version is too old.
28 LOG.fatal('Minimum supported version of %s is %d.%d. '
29 'Found version %d.%d.',
30 module, expected[0], expected[1], actual[0], actual[1])
31 raise UsageError(f'{module} is too old.')
34 def setup_database_skeleton(dsn, rouser=None):
35 """ Create a new database for Nominatim and populate it with the
38 The function fails when the database already exists or Postgresql or
39 PostGIS versions are too old.
41 Uses `createdb` to create the database.
43 If 'rouser' is given, then the function also checks that the user
44 with that given name exists.
46 Requires superuser rights by the caller.
48 proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
50 if proc.returncode != 0:
51 raise UsageError('Creating new database failed.')
53 with connect(dsn) as conn:
54 _require_version('PostgreSQL server',
55 conn.server_version_tuple(),
56 POSTGRESQL_REQUIRED_VERSION)
58 if rouser is not None:
59 with conn.cursor() as cur:
60 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
63 LOG.fatal("Web user '%s' does not exists. Create it with:\n"
64 "\n createuser %s", rouser, rouser)
65 raise UsageError('Missing read-only user.')
68 with conn.cursor() as cur:
69 cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
70 cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
73 _require_version('PostGIS',
74 conn.postgis_version_tuple(),
75 POSTGIS_REQUIRED_VERSION)
78 def import_osm_data(osm_files, options, drop=False, ignore_errors=False):
79 """ Import the given OSM files. 'options' contains the list of
80 default settings for osm2pgsql.
82 options['import_file'] = osm_files
83 options['append'] = False
84 options['threads'] = 1
86 if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
87 # Make some educated guesses about cache size based on the size
88 # of the import file and the available memory.
89 mem = psutil.virtual_memory()
91 if isinstance(osm_files, list):
92 for fname in osm_files:
93 fsize += os.stat(str(fname)).st_size
95 fsize = os.stat(str(osm_files)).st_size
96 options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
97 fsize * 2) / 1024 / 1024) + 1
99 run_osm2pgsql(options)
101 with connect(options['dsn']) as conn:
102 if not ignore_errors:
103 with conn.cursor() as cur:
104 cur.execute('SELECT * FROM place LIMIT 1')
105 if cur.rowcount == 0:
106 raise UsageError('No data imported by osm2pgsql.')
109 conn.drop_table('planet_osm_nodes')
111 if drop and options['flatnode_file']:
112 Path(options['flatnode_file']).unlink()
115 def create_tables(conn, config, reverse_only=False):
116 """ Create the set of basic tables.
117 When `reverse_only` is True, then the main table for searching will
118 be skipped and only reverse search is possible.
120 sql = SQLPreprocessor(conn, config)
121 sql.env.globals['db']['reverse_only'] = reverse_only
123 sql.run_sql_file(conn, 'tables.sql')
126 def create_table_triggers(conn, config):
127 """ Create the triggers for the tables. The trigger functions must already
128 have been imported with refresh.create_functions().
130 sql = SQLPreprocessor(conn, config)
131 sql.run_sql_file(conn, 'table-triggers.sql')
134 def create_partition_tables(conn, config):
135 """ Create tables that have explicit partitioning.
137 sql = SQLPreprocessor(conn, config)
138 sql.run_sql_file(conn, 'partition-tables.src.sql')
141 def truncate_data_tables(conn):
142 """ Truncate all data tables to prepare for a fresh load.
144 with conn.cursor() as cur:
145 cur.execute('TRUNCATE placex')
146 cur.execute('TRUNCATE place_addressline')
147 cur.execute('TRUNCATE location_area')
148 cur.execute('TRUNCATE location_area_country')
149 cur.execute('TRUNCATE location_property_tiger')
150 cur.execute('TRUNCATE location_property_osmline')
151 cur.execute('TRUNCATE location_postcode')
152 if conn.table_exists('search_name'):
153 cur.execute('TRUNCATE search_name')
154 cur.execute('DROP SEQUENCE IF EXISTS seq_place')
155 cur.execute('CREATE SEQUENCE seq_place start 100000')
157 cur.execute("""SELECT tablename FROM pg_tables
158 WHERE tablename LIKE 'location_road_%'""")
160 for table in [r[0] for r in list(cur)]:
161 cur.execute('TRUNCATE ' + table)
166 _COPY_COLUMNS = pysql.SQL(',').join(map(pysql.Identifier,
167 ('osm_type', 'osm_id', 'class', 'type',
168 'name', 'admin_level', 'address',
169 'extratags', 'geometry')))
172 def load_data(dsn, threads):
173 """ Copy data into the word and placex table.
175 sel = selectors.DefaultSelector()
176 # Then copy data from place to placex in <threads - 1> chunks.
177 place_threads = max(1, threads - 1)
178 for imod in range(place_threads):
179 conn = DBConnection(dsn)
182 pysql.SQL("""INSERT INTO placex ({columns})
183 SELECT {columns} FROM place
184 WHERE osm_id % {total} = {mod}
185 AND NOT (class='place' and (type='houses' or type='postcode'))
186 AND ST_IsValid(geometry)
187 """).format(columns=_COPY_COLUMNS,
188 total=pysql.Literal(place_threads),
189 mod=pysql.Literal(imod)))
190 sel.register(conn, selectors.EVENT_READ, conn)
192 # Address interpolations go into another table.
193 conn = DBConnection(dsn)
195 conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
196 SELECT osm_id, address, geometry FROM place
197 WHERE class='place' and type='houses' and osm_type='W'
198 and ST_GeometryType(geometry) = 'ST_LineString'
200 sel.register(conn, selectors.EVENT_READ, conn)
202 # Now wait for all of them to finish.
203 todo = place_threads + 1
205 for key, _ in sel.select(1):
211 print('.', end='', flush=True)
214 with connect(dsn) as conn:
215 with conn.cursor() as cur:
216 cur.execute('ANALYSE')
219 def create_search_indices(conn, config, drop=False):
220 """ Create tables that have explicit partitioning.
223 # If index creation failed and left an index invalid, they need to be
224 # cleaned out first, so that the script recreates them.
225 with conn.cursor() as cur:
226 cur.execute("""SELECT relname FROM pg_class, pg_index
227 WHERE pg_index.indisvalid = false
228 AND pg_index.indexrelid = pg_class.oid""")
229 bad_indices = [row[0] for row in list(cur)]
230 for idx in bad_indices:
231 LOG.info("Drop invalid index %s.", idx)
232 cur.execute('DROP INDEX "{}"'.format(idx))
235 sql = SQLPreprocessor(conn, config)
237 sql.run_sql_file(conn, 'indices.sql', drop=drop)
240 def create_country_names(conn, tokenizer, languages=None):
241 """ Add default country names to search index. `languages` is a comma-
242 separated list of language codes as used in OSM. If `languages` is not
243 empty then only name translations for the given languages are added
247 languages = languages.split(',')
249 def _include_key(key):
250 return key == 'name' or \
251 (key.startswith('name:') and (not languages or key[5:] in languages))
253 with conn.cursor() as cur:
254 psycopg2.extras.register_hstore(cur)
255 cur.execute("""SELECT country_code, name FROM country_name
256 WHERE country_code is not null""")
258 with tokenizer.name_analyzer() as analyzer:
259 for code, name in cur:
260 names = {'countrycode': code}
262 names['short_name'] = 'UK'
264 names['short_name'] = 'United States'
266 # country names (only in languages as provided)
268 names.update(((k, v) for k, v in name.items() if _include_key(k)))
270 analyzer.add_country_names(code, names)