1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Exporting a Nominatim database to SQlite.
10 from typing import Set, Any, Optional, Union
13 from pathlib import Path
15 import sqlalchemy as sa
17 import nominatim_api as napi
18 from nominatim_api.search.query_analyzer_factory import make_query_analyzer
19 from nominatim_api.typing import SaSelect, SaRow
20 from nominatim_api.sql.sqlalchemy_types import Geometry, IntArray
22 LOG = logging.getLogger()
25 async def convert(project_dir: Optional[Union[str, Path]],
26 outfile: Path, options: Set[str]) -> None:
27 """ Export an existing database to sqlite. The resulting database
28 will be usable against the Python frontend of Nominatim.
30 api = napi.NominatimAPIAsync(project_dir)
33 outapi = napi.NominatimAPIAsync(project_dir,
34 {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}",
35 'NOMINATIM_DATABASE_RW': '1'})
38 async with api.begin() as src, outapi.begin() as dest:
39 writer = SqliteWriter(src, dest, options)
48 """ Worker class which creates a new SQLite database.
51 def __init__(self, src: napi.SearchConnection,
52 dest: napi.SearchConnection, options: Set[str]) -> None:
55 self.options = options
57 async def write(self) -> None:
58 """ Create the database structure and copy the data from
59 the source database to the destination.
61 LOG.warning('Setting up spatialite')
62 await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
64 await self.create_tables()
65 await self.copy_data()
66 if 'search' in self.options:
67 await self.create_word_table()
68 await self.create_indexes()
70 async def create_tables(self) -> None:
71 """ Set up the database tables.
73 LOG.warning('Setting up tables')
74 if 'search' not in self.options:
75 self.dest.t.meta.remove(self.dest.t.search_name)
77 await self.create_class_tables()
79 await self.dest.connection.run_sync(self.dest.t.meta.create_all)
81 # Convert all Geometry columns to Spatialite geometries
82 for table in self.dest.t.meta.sorted_tables:
84 if isinstance(col.type, Geometry):
85 await self.dest.execute(sa.select(
86 sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
87 col.type.subtype.upper(), 'XY')))
89 async def create_class_tables(self) -> None:
90 """ Set up the table that serve class/type-specific geometries.
92 sql = sa.text("""SELECT tablename FROM pg_tables
93 WHERE tablename LIKE 'place_classtype_%'""")
94 for res in await self.src.execute(sql):
95 for db in (self.src, self.dest):
96 sa.Table(res[0], db.t.meta,
97 sa.Column('place_id', sa.BigInteger),
98 sa.Column('centroid', Geometry))
100 async def create_word_table(self) -> None:
101 """ Create the word table.
102 This table needs the property information to determine the
103 correct format. Therefore needs to be done after all other
104 data has been copied.
106 await make_query_analyzer(self.src)
107 await make_query_analyzer(self.dest)
108 src = self.src.t.meta.tables['word']
109 dest = self.dest.t.meta.tables['word']
111 await self.dest.connection.run_sync(dest.create)
113 LOG.warning("Copying word table")
114 async_result = await self.src.connection.stream(sa.select(src))
116 async for partition in async_result.partitions(10000):
117 data = [{k: getattr(r, k) for k in r._fields} for r in partition]
118 await self.dest.execute(dest.insert(), data)
120 await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
122 async def copy_data(self) -> None:
123 """ Copy data for all registered tables.
125 def _getfield(row: SaRow, key: str) -> Any:
126 value = getattr(row, key)
127 if isinstance(value, dt.datetime):
128 if value.tzinfo is not None:
129 value = value.astimezone(dt.timezone.utc)
132 for table in self.dest.t.meta.sorted_tables:
133 LOG.warning("Copying '%s'", table.name)
134 async_result = await self.src.connection.stream(self.select_from(table.name))
136 async for partition in async_result.partitions(10000):
137 data = [{('class_' if k == 'class' else k): _getfield(r, k)
140 await self.dest.execute(table.insert(), data)
142 # Set up a minimal copy of pg_tables used to look up the class tables later.
143 pg_tables = sa.Table('pg_tables', self.dest.t.meta,
144 sa.Column('schemaname', sa.Text, default='public'),
145 sa.Column('tablename', sa.Text))
146 await self.dest.connection.run_sync(pg_tables.create)
147 data = [{'tablename': t} for t in self.dest.t.meta.tables]
148 await self.dest.execute(pg_tables.insert().values(data))
150 async def create_indexes(self) -> None:
151 """ Add indexes necessary for the frontend.
153 # reverse place node lookup needs an extra table to simulate a
154 # partial index with adaptive buffering.
155 await self.dest.execute(sa.text(
156 """ CREATE TABLE placex_place_node_areas AS
157 SELECT place_id, ST_Expand(geometry,
158 14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
160 WHERE rank_address between 5 and 25
162 and linked_place_id is NULL """))
163 await self.dest.execute(sa.select(
164 sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
165 4326, 'GEOMETRY', 'XY')))
166 await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
167 'placex_place_node_areas', 'geometry')))
170 await self.create_spatial_index('country_grid', 'geometry')
171 await self.create_spatial_index('placex', 'geometry')
172 await self.create_spatial_index('osmline', 'linegeo')
173 await self.create_spatial_index('tiger', 'linegeo')
174 await self.create_index('placex', 'place_id')
175 await self.create_index('placex', 'parent_place_id')
176 await self.create_index('placex', 'rank_address')
177 await self.create_index('addressline', 'place_id')
178 await self.create_index('postcode', 'place_id')
179 await self.create_index('osmline', 'place_id')
180 await self.create_index('tiger', 'place_id')
182 if 'search' in self.options:
183 await self.create_spatial_index('postcode', 'geometry')
184 await self.create_spatial_index('search_name', 'centroid')
185 await self.create_index('search_name', 'place_id')
186 await self.create_index('osmline', 'parent_place_id')
187 await self.create_index('tiger', 'parent_place_id')
188 await self.create_search_index()
190 for t in self.dest.t.meta.tables:
191 if t.startswith('place_classtype_'):
192 await self.dest.execute(sa.select(
193 sa.func.CreateSpatialIndex(t, 'centroid')))
195 async def create_spatial_index(self, table: str, column: str) -> None:
196 """ Create a spatial index on the given table and column.
198 await self.dest.execute(sa.select(
199 sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
201 async def create_index(self, table_name: str, column: str) -> None:
202 """ Create a simple index on the given table and column.
204 table = getattr(self.dest.t, table_name)
205 await self.dest.connection.run_sync(
206 sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
208 async def create_search_index(self) -> None:
209 """ Create the tables and indexes needed for word lookup.
211 LOG.warning("Creating reverse search table")
212 rsn = sa.Table('reverse_search_name', self.dest.t.meta,
213 sa.Column('word', sa.Integer()),
214 sa.Column('column', sa.Text()),
215 sa.Column('places', IntArray))
216 await self.dest.connection.run_sync(rsn.create)
218 tsrc = self.src.t.search_name
219 for column in ('name_vector', 'nameaddress_vector'):
220 sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
221 sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
224 async_result = await self.src.connection.stream(sql)
225 async for partition in async_result.partitions(100):
227 for row in partition:
229 data.append({'word': row.word,
231 'places': row.places})
232 await self.dest.execute(rsn.insert(), data)
234 await self.dest.connection.run_sync(
235 sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
237 def select_from(self, table: str) -> SaSelect:
238 """ Create the SQL statement to select the source columns and rows.
240 columns = self.src.t.meta.tables[table].c
242 if table == 'placex':
243 # SQLite struggles with Geometries that are larger than 5MB,
245 return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
246 sa.func.ST_AsText(columns.centroid).label('centroid'),
248 sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
250 else_=sa.func.ST_SimplifyPreserveTopology(
251 columns.geometry, 0.0001)
252 )).label('geometry'))
254 sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
255 if isinstance(c.type, Geometry) else c for c in columns))