1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Exporting a Nominatim database to SQlite.
10 from typing import Set, Any, Optional, Union
13 from pathlib import Path
15 import sqlalchemy as sa
17 import nominatim_api as napi
18 from nominatim_api.search.query_analyzer_factory import make_query_analyzer
19 from nominatim_api.typing import SaSelect, SaRow
20 from nominatim_api.sql.sqlalchemy_types import Geometry, IntArray
22 LOG = logging.getLogger()
24 async def convert(project_dir: Optional[Union[str, Path]],
25 outfile: Path, options: Set[str]) -> None:
26 """ Export an existing database to sqlite. The resulting database
27 will be usable against the Python frontend of Nominatim.
29 api = napi.NominatimAPIAsync(project_dir)
32 outapi = napi.NominatimAPIAsync(project_dir,
33 {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}",
34 'NOMINATIM_DATABASE_RW': '1'})
37 async with api.begin() as src, outapi.begin() as dest:
38 writer = SqliteWriter(src, dest, options)
47 """ Worker class which creates a new SQLite database.
50 def __init__(self, src: napi.SearchConnection,
51 dest: napi.SearchConnection, options: Set[str]) -> None:
54 self.options = options
57 async def write(self) -> None:
58 """ Create the database structure and copy the data from
59 the source database to the destination.
61 LOG.warning('Setting up spatialite')
62 await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
64 await self.create_tables()
65 await self.copy_data()
66 if 'search' in self.options:
67 await self.create_word_table()
68 await self.create_indexes()
71 async def create_tables(self) -> None:
72 """ Set up the database tables.
74 LOG.warning('Setting up tables')
75 if 'search' not in self.options:
76 self.dest.t.meta.remove(self.dest.t.search_name)
78 await self.create_class_tables()
80 await self.dest.connection.run_sync(self.dest.t.meta.create_all)
82 # Convert all Geometry columns to Spatialite geometries
83 for table in self.dest.t.meta.sorted_tables:
85 if isinstance(col.type, Geometry):
86 await self.dest.execute(sa.select(
87 sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
88 col.type.subtype.upper(), 'XY')))
91 async def create_class_tables(self) -> None:
92 """ Set up the table that serve class/type-specific geometries.
94 sql = sa.text("""SELECT tablename FROM pg_tables
95 WHERE tablename LIKE 'place_classtype_%'""")
96 for res in await self.src.execute(sql):
97 for db in (self.src, self.dest):
98 sa.Table(res[0], db.t.meta,
99 sa.Column('place_id', sa.BigInteger),
100 sa.Column('centroid', Geometry))
103 async def create_word_table(self) -> None:
104 """ Create the word table.
105 This table needs the property information to determine the
106 correct format. Therefore needs to be done after all other
107 data has been copied.
109 await make_query_analyzer(self.src)
110 await make_query_analyzer(self.dest)
111 src = self.src.t.meta.tables['word']
112 dest = self.dest.t.meta.tables['word']
114 await self.dest.connection.run_sync(dest.create)
116 LOG.warning("Copying word table")
117 async_result = await self.src.connection.stream(sa.select(src))
119 async for partition in async_result.partitions(10000):
120 data = [{k: getattr(r, k) for k in r._fields} for r in partition]
121 await self.dest.execute(dest.insert(), data)
123 await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
126 async def copy_data(self) -> None:
127 """ Copy data for all registered tables.
129 def _getfield(row: SaRow, key: str) -> Any:
130 value = getattr(row, key)
131 if isinstance(value, dt.datetime):
132 if value.tzinfo is not None:
133 value = value.astimezone(dt.timezone.utc)
136 for table in self.dest.t.meta.sorted_tables:
137 LOG.warning("Copying '%s'", table.name)
138 async_result = await self.src.connection.stream(self.select_from(table.name))
140 async for partition in async_result.partitions(10000):
141 data = [{('class_' if k == 'class' else k): _getfield(r, k)
144 await self.dest.execute(table.insert(), data)
146 # Set up a minimal copy of pg_tables used to look up the class tables later.
147 pg_tables = sa.Table('pg_tables', self.dest.t.meta,
148 sa.Column('schemaname', sa.Text, default='public'),
149 sa.Column('tablename', sa.Text))
150 await self.dest.connection.run_sync(pg_tables.create)
151 data = [{'tablename': t} for t in self.dest.t.meta.tables]
152 await self.dest.execute(pg_tables.insert().values(data))
155 async def create_indexes(self) -> None:
156 """ Add indexes necessary for the frontend.
158 # reverse place node lookup needs an extra table to simulate a
159 # partial index with adaptive buffering.
160 await self.dest.execute(sa.text(
161 """ CREATE TABLE placex_place_node_areas AS
162 SELECT place_id, ST_Expand(geometry,
163 14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
165 WHERE rank_address between 5 and 25
167 and linked_place_id is NULL """))
168 await self.dest.execute(sa.select(
169 sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
170 4326, 'GEOMETRY', 'XY')))
171 await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
172 'placex_place_node_areas', 'geometry')))
175 await self.create_spatial_index('country_grid', 'geometry')
176 await self.create_spatial_index('placex', 'geometry')
177 await self.create_spatial_index('osmline', 'linegeo')
178 await self.create_spatial_index('tiger', 'linegeo')
179 await self.create_index('placex', 'place_id')
180 await self.create_index('placex', 'parent_place_id')
181 await self.create_index('placex', 'rank_address')
182 await self.create_index('addressline', 'place_id')
183 await self.create_index('postcode', 'place_id')
184 await self.create_index('osmline', 'place_id')
185 await self.create_index('tiger', 'place_id')
187 if 'search' in self.options:
188 await self.create_spatial_index('postcode', 'geometry')
189 await self.create_spatial_index('search_name', 'centroid')
190 await self.create_index('search_name', 'place_id')
191 await self.create_index('osmline', 'parent_place_id')
192 await self.create_index('tiger', 'parent_place_id')
193 await self.create_search_index()
195 for t in self.dest.t.meta.tables:
196 if t.startswith('place_classtype_'):
197 await self.dest.execute(sa.select(
198 sa.func.CreateSpatialIndex(t, 'centroid')))
201 async def create_spatial_index(self, table: str, column: str) -> None:
202 """ Create a spatial index on the given table and column.
204 await self.dest.execute(sa.select(
205 sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
208 async def create_index(self, table_name: str, column: str) -> None:
209 """ Create a simple index on the given table and column.
211 table = getattr(self.dest.t, table_name)
212 await self.dest.connection.run_sync(
213 sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
216 async def create_search_index(self) -> None:
217 """ Create the tables and indexes needed for word lookup.
219 LOG.warning("Creating reverse search table")
220 rsn = sa.Table('reverse_search_name', self.dest.t.meta,
221 sa.Column('word', sa.Integer()),
222 sa.Column('column', sa.Text()),
223 sa.Column('places', IntArray))
224 await self.dest.connection.run_sync(rsn.create)
226 tsrc = self.src.t.search_name
227 for column in ('name_vector', 'nameaddress_vector'):
228 sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
229 sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
232 async_result = await self.src.connection.stream(sql)
233 async for partition in async_result.partitions(100):
235 for row in partition:
237 data.append({'word': row.word,
239 'places': row.places})
240 await self.dest.execute(rsn.insert(), data)
242 await self.dest.connection.run_sync(
243 sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
246 def select_from(self, table: str) -> SaSelect:
247 """ Create the SQL statement to select the source columns and rows.
249 columns = self.src.t.meta.tables[table].c
251 if table == 'placex':
252 # SQLite struggles with Geometries that are larger than 5MB,
254 return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
255 sa.func.ST_AsText(columns.centroid).label('centroid'),
257 sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
259 else_=sa.func.ST_SimplifyPreserveTopology(
260 columns.geometry, 0.0001)
261 )).label('geometry'))
263 sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
264 if isinstance(c.type, Geometry) else c for c in columns))