1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Exporting a Nominatim database to SQlite.
10 from typing import Set, Any
13 from pathlib import Path
15 import sqlalchemy as sa
17 from nominatim.typing import SaSelect, SaRow
18 from nominatim.db.sqlalchemy_types import Geometry, IntArray
19 from nominatim.api.search.query_analyzer_factory import make_query_analyzer
20 import nominatim.api as napi
22 LOG = logging.getLogger()
24 async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
25 """ Export an existing database to sqlite. The resulting database
26 will be usable against the Python frontend of Nominatim.
28 api = napi.NominatimAPIAsync(project_dir)
31 outapi = napi.NominatimAPIAsync(project_dir,
32 {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}",
33 'NOMINATIM_DATABASE_RW': '1'})
36 async with api.begin() as src, outapi.begin() as dest:
37 writer = SqliteWriter(src, dest, options)
46 """ Worker class which creates a new SQLite database.
49 def __init__(self, src: napi.SearchConnection,
50 dest: napi.SearchConnection, options: Set[str]) -> None:
53 self.options = options
56 async def write(self) -> None:
57 """ Create the database structure and copy the data from
58 the source database to the destination.
60 LOG.warning('Setting up spatialite')
61 await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
63 await self.create_tables()
64 await self.copy_data()
65 if 'search' in self.options:
66 await self.create_word_table()
67 await self.create_indexes()
70 async def create_tables(self) -> None:
71 """ Set up the database tables.
73 LOG.warning('Setting up tables')
74 if 'search' not in self.options:
75 self.dest.t.meta.remove(self.dest.t.search_name)
77 await self.create_class_tables()
79 await self.dest.connection.run_sync(self.dest.t.meta.create_all)
81 # Convert all Geometry columns to Spatialite geometries
82 for table in self.dest.t.meta.sorted_tables:
84 if isinstance(col.type, Geometry):
85 await self.dest.execute(sa.select(
86 sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
87 col.type.subtype.upper(), 'XY')))
90 async def create_class_tables(self) -> None:
91 """ Set up the table that serve class/type-specific geometries.
93 sql = sa.text("""SELECT tablename FROM pg_tables
94 WHERE tablename LIKE 'place_classtype_%'""")
95 for res in await self.src.execute(sql):
96 for db in (self.src, self.dest):
97 sa.Table(res[0], db.t.meta,
98 sa.Column('place_id', sa.BigInteger),
99 sa.Column('centroid', Geometry))
102 async def create_word_table(self) -> None:
103 """ Create the word table.
104 This table needs the property information to determine the
105 correct format. Therefore needs to be done after all other
106 data has been copied.
108 await make_query_analyzer(self.src)
109 await make_query_analyzer(self.dest)
110 src = self.src.t.meta.tables['word']
111 dest = self.dest.t.meta.tables['word']
113 await self.dest.connection.run_sync(dest.create)
115 LOG.warning("Copying word table")
116 async_result = await self.src.connection.stream(sa.select(src))
118 async for partition in async_result.partitions(10000):
119 data = [{k: getattr(r, k) for k in r._fields} for r in partition]
120 await self.dest.execute(dest.insert(), data)
122 await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
125 async def copy_data(self) -> None:
126 """ Copy data for all registered tables.
128 def _getfield(row: SaRow, key: str) -> Any:
129 value = getattr(row, key)
130 if isinstance(value, dt.datetime):
131 if value.tzinfo is not None:
132 value = value.astimezone(dt.timezone.utc)
135 for table in self.dest.t.meta.sorted_tables:
136 LOG.warning("Copying '%s'", table.name)
137 async_result = await self.src.connection.stream(self.select_from(table.name))
139 async for partition in async_result.partitions(10000):
140 data = [{('class_' if k == 'class' else k): _getfield(r, k)
143 await self.dest.execute(table.insert(), data)
145 # Set up a minimal copy of pg_tables used to look up the class tables later.
146 pg_tables = sa.Table('pg_tables', self.dest.t.meta,
147 sa.Column('schemaname', sa.Text, default='public'),
148 sa.Column('tablename', sa.Text))
149 await self.dest.connection.run_sync(pg_tables.create)
150 data = [{'tablename': t} for t in self.dest.t.meta.tables]
151 await self.dest.execute(pg_tables.insert().values(data))
154 async def create_indexes(self) -> None:
155 """ Add indexes necessary for the frontend.
157 # reverse place node lookup needs an extra table to simulate a
158 # partial index with adaptive buffering.
159 await self.dest.execute(sa.text(
160 """ CREATE TABLE placex_place_node_areas AS
161 SELECT place_id, ST_Expand(geometry,
162 14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
164 WHERE rank_address between 5 and 25
166 and linked_place_id is NULL """))
167 await self.dest.execute(sa.select(
168 sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
169 4326, 'GEOMETRY', 'XY')))
170 await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
171 'placex_place_node_areas', 'geometry')))
174 await self.create_spatial_index('country_grid', 'geometry')
175 await self.create_spatial_index('placex', 'geometry')
176 await self.create_spatial_index('osmline', 'linegeo')
177 await self.create_spatial_index('tiger', 'linegeo')
178 await self.create_index('placex', 'place_id')
179 await self.create_index('placex', 'parent_place_id')
180 await self.create_index('placex', 'rank_address')
181 await self.create_index('addressline', 'place_id')
182 await self.create_index('postcode', 'place_id')
183 await self.create_index('osmline', 'place_id')
184 await self.create_index('tiger', 'place_id')
186 if 'search' in self.options:
187 await self.create_spatial_index('postcode', 'geometry')
188 await self.create_spatial_index('search_name', 'centroid')
189 await self.create_index('search_name', 'place_id')
190 await self.create_index('osmline', 'parent_place_id')
191 await self.create_index('tiger', 'parent_place_id')
192 await self.create_search_index()
194 for t in self.dest.t.meta.tables:
195 if t.startswith('place_classtype_'):
196 await self.dest.execute(sa.select(
197 sa.func.CreateSpatialIndex(t, 'centroid')))
200 async def create_spatial_index(self, table: str, column: str) -> None:
201 """ Create a spatial index on the given table and column.
203 await self.dest.execute(sa.select(
204 sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
207 async def create_index(self, table_name: str, column: str) -> None:
208 """ Create a simple index on the given table and column.
210 table = getattr(self.dest.t, table_name)
211 await self.dest.connection.run_sync(
212 sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
215 async def create_search_index(self) -> None:
216 """ Create the tables and indexes needed for word lookup.
218 LOG.warning("Creating reverse search table")
219 rsn = sa.Table('reverse_search_name', self.dest.t.meta,
220 sa.Column('word', sa.Integer()),
221 sa.Column('column', sa.Text()),
222 sa.Column('places', IntArray))
223 await self.dest.connection.run_sync(rsn.create)
225 tsrc = self.src.t.search_name
226 for column in ('name_vector', 'nameaddress_vector'):
227 sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
228 sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
231 async_result = await self.src.connection.stream(sql)
232 async for partition in async_result.partitions(100):
234 for row in partition:
236 data.append({'word': row.word,
238 'places': row.places})
239 await self.dest.execute(rsn.insert(), data)
241 await self.dest.connection.run_sync(
242 sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
245 def select_from(self, table: str) -> SaSelect:
246 """ Create the SQL statement to select the source columns and rows.
248 columns = self.src.t.meta.tables[table].c
250 if table == 'placex':
251 # SQLite struggles with Geometries that are larger than 5MB,
253 return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
254 sa.func.ST_AsText(columns.centroid).label('centroid'),
256 sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
258 else_=sa.func.ST_SimplifyPreserveTopology(
259 columns.geometry, 0.0001)
260 )).label('geometry'))
262 sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
263 if isinstance(c.type, Geometry) else c for c in columns))