1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Exporting a Nominatim database to SQlite.
10 from typing import Set
12 from pathlib import Path
14 import sqlalchemy as sa
16 from nominatim.typing import SaSelect
17 from nominatim.db.sqlalchemy_types import Geometry, IntArray
18 from nominatim.api.search.query_analyzer_factory import make_query_analyzer
19 import nominatim.api as napi
21 LOG = logging.getLogger()
23 async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
24 """ Export an existing database to sqlite. The resulting database
25 will be usable against the Python frontend of Nominatim.
27 api = napi.NominatimAPIAsync(project_dir)
30 outapi = napi.NominatimAPIAsync(project_dir,
31 {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}",
32 'NOMINATIM_DATABASE_RW': '1'})
35 async with api.begin() as src, outapi.begin() as dest:
36 writer = SqliteWriter(src, dest, options)
45 """ Worker class which creates a new SQLite database.
48 def __init__(self, src: napi.SearchConnection,
49 dest: napi.SearchConnection, options: Set[str]) -> None:
52 self.options = options
55 async def write(self) -> None:
56 """ Create the database structure and copy the data from
57 the source database to the destination.
59 LOG.warning('Setting up spatialite')
60 await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
62 await self.create_tables()
63 await self.copy_data()
64 if 'search' in self.options:
65 await self.create_word_table()
66 await self.create_indexes()
69 async def create_tables(self) -> None:
70 """ Set up the database tables.
72 LOG.warning('Setting up tables')
73 if 'search' not in self.options:
74 self.dest.t.meta.remove(self.dest.t.search_name)
76 await self.create_class_tables()
78 await self.dest.connection.run_sync(self.dest.t.meta.create_all)
80 # Convert all Geometry columns to Spatialite geometries
81 for table in self.dest.t.meta.sorted_tables:
83 if isinstance(col.type, Geometry):
84 await self.dest.execute(sa.select(
85 sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
86 col.type.subtype.upper(), 'XY')))
89 async def create_class_tables(self) -> None:
90 """ Set up the table that serve class/type-specific geometries.
92 sql = sa.text("""SELECT tablename FROM pg_tables
93 WHERE tablename LIKE 'place_classtype_%'""")
94 for res in await self.src.execute(sql):
95 for db in (self.src, self.dest):
96 sa.Table(res[0], db.t.meta,
97 sa.Column('place_id', sa.BigInteger),
98 sa.Column('centroid', Geometry))
101 async def create_word_table(self) -> None:
102 """ Create the word table.
103 This table needs the property information to determine the
104 correct format. Therefore needs to be done after all other
105 data has been copied.
107 await make_query_analyzer(self.src)
108 await make_query_analyzer(self.dest)
109 src = self.src.t.meta.tables['word']
110 dest = self.dest.t.meta.tables['word']
112 await self.dest.connection.run_sync(dest.create)
114 LOG.warning("Copying word table")
115 async_result = await self.src.connection.stream(sa.select(src))
117 async for partition in async_result.partitions(10000):
118 data = [{k: getattr(r, k) for k in r._fields} for r in partition]
119 await self.dest.execute(dest.insert(), data)
121 await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
124 async def copy_data(self) -> None:
125 """ Copy data for all registered tables.
127 for table in self.dest.t.meta.sorted_tables:
128 LOG.warning("Copying '%s'", table.name)
129 async_result = await self.src.connection.stream(self.select_from(table.name))
131 async for partition in async_result.partitions(10000):
132 data = [{('class_' if k == 'class' else k): getattr(r, k) for k in r._fields}
134 await self.dest.execute(table.insert(), data)
136 # Set up a minimal copy of pg_tables used to look up the class tables later.
137 pg_tables = sa.Table('pg_tables', self.dest.t.meta,
138 sa.Column('schemaname', sa.Text, default='public'),
139 sa.Column('tablename', sa.Text))
140 await self.dest.connection.run_sync(pg_tables.create)
141 data = [{'tablename': t} for t in self.dest.t.meta.tables]
142 await self.dest.execute(pg_tables.insert().values(data))
145 async def create_indexes(self) -> None:
146 """ Add indexes necessary for the frontend.
148 # reverse place node lookup needs an extra table to simulate a
149 # partial index with adaptive buffering.
150 await self.dest.execute(sa.text(
151 """ CREATE TABLE placex_place_node_areas AS
152 SELECT place_id, ST_Expand(geometry,
153 14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
155 WHERE rank_address between 5 and 25
157 and linked_place_id is NULL """))
158 await self.dest.execute(sa.select(
159 sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
160 4326, 'GEOMETRY', 'XY')))
161 await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
162 'placex_place_node_areas', 'geometry')))
165 await self.create_spatial_index('country_grid', 'geometry')
166 await self.create_spatial_index('placex', 'geometry')
167 await self.create_spatial_index('osmline', 'linegeo')
168 await self.create_spatial_index('tiger', 'linegeo')
169 await self.create_index('placex', 'place_id')
170 await self.create_index('placex', 'parent_place_id')
171 await self.create_index('placex', 'rank_address')
172 await self.create_index('addressline', 'place_id')
173 await self.create_index('postcode', 'place_id')
174 await self.create_index('osmline', 'place_id')
175 await self.create_index('tiger', 'place_id')
177 if 'search' in self.options:
178 await self.create_spatial_index('postcode', 'geometry')
179 await self.create_spatial_index('search_name', 'centroid')
180 await self.create_index('search_name', 'place_id')
181 await self.create_index('osmline', 'parent_place_id')
182 await self.create_index('tiger', 'parent_place_id')
183 await self.create_search_index()
185 for t in self.dest.t.meta.tables:
186 if t.startswith('place_classtype_'):
187 await self.dest.execute(sa.select(
188 sa.func.CreateSpatialIndex(t, 'centroid')))
191 async def create_spatial_index(self, table: str, column: str) -> None:
192 """ Create a spatial index on the given table and column.
194 await self.dest.execute(sa.select(
195 sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
198 async def create_index(self, table_name: str, column: str) -> None:
199 """ Create a simple index on the given table and column.
201 table = getattr(self.dest.t, table_name)
202 await self.dest.connection.run_sync(
203 sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
206 async def create_search_index(self) -> None:
207 """ Create the tables and indexes needed for word lookup.
209 LOG.warning("Creating reverse search table")
210 rsn = sa.Table('reverse_search_name', self.dest.t.meta,
211 sa.Column('word', sa.Integer()),
212 sa.Column('column', sa.Text()),
213 sa.Column('places', IntArray))
214 await self.dest.connection.run_sync(rsn.create)
216 tsrc = self.src.t.search_name
217 for column in ('name_vector', 'nameaddress_vector'):
218 sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
219 sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
222 async_result = await self.src.connection.stream(sql)
223 async for partition in async_result.partitions(100):
225 for row in partition:
227 data.append({'word': row.word,
229 'places': row.places})
230 await self.dest.execute(rsn.insert(), data)
232 await self.dest.connection.run_sync(
233 sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
236 def select_from(self, table: str) -> SaSelect:
237 """ Create the SQL statement to select the source columns and rows.
239 columns = self.src.t.meta.tables[table].c
241 if table == 'placex':
242 # SQLite struggles with Geometries that are larger than 5MB,
244 return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
245 sa.func.ST_AsText(columns.centroid).label('centroid'),
247 sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
249 else_=sa.func.ST_SimplifyPreserveTopology(
250 columns.geometry, 0.0001)
251 )).label('geometry'))
253 sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
254 if isinstance(c.type, Geometry) else c for c in columns))