1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Exporting a Nominatim database to SQlite.
10 from typing import Set
12 from pathlib import Path
14 import sqlalchemy as sa
16 from nominatim.typing import SaSelect
17 from nominatim.db.sqlalchemy_types import Geometry, IntArray
18 from nominatim.api.search.query_analyzer_factory import make_query_analyzer
19 import nominatim.api as napi
21 LOG = logging.getLogger()
23 async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
24 """ Export an existing database to sqlite. The resulting database
25 will be usable against the Python frontend of Nominatim.
27 api = napi.NominatimAPIAsync(project_dir)
30 outapi = napi.NominatimAPIAsync(project_dir,
31 {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}"})
34 async with api.begin() as src, outapi.begin() as dest:
35 writer = SqliteWriter(src, dest, options)
44 """ Worker class which creates a new SQLite database.
47 def __init__(self, src: napi.SearchConnection,
48 dest: napi.SearchConnection, options: Set[str]) -> None:
51 self.options = options
54 async def write(self) -> None:
55 """ Create the database structure and copy the data from
56 the source database to the destination.
58 LOG.warning('Setting up spatialite')
59 await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
61 await self.create_tables()
62 await self.copy_data()
63 if 'search' in self.options:
64 await self.create_word_table()
65 await self.create_indexes()
68 async def create_tables(self) -> None:
69 """ Set up the database tables.
71 LOG.warning('Setting up tables')
72 if 'search' not in self.options:
73 self.dest.t.meta.remove(self.dest.t.search_name)
75 await self.create_class_tables()
77 await self.dest.connection.run_sync(self.dest.t.meta.create_all)
79 # Convert all Geometry columns to Spatialite geometries
80 for table in self.dest.t.meta.sorted_tables:
82 if isinstance(col.type, Geometry):
83 await self.dest.execute(sa.select(
84 sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
85 col.type.subtype.upper(), 'XY')))
88 async def create_class_tables(self) -> None:
89 """ Set up the table that serve class/type-specific geometries.
91 sql = sa.text("""SELECT tablename FROM pg_tables
92 WHERE tablename LIKE 'place_classtype_%'""")
93 for res in await self.src.execute(sql):
94 for db in (self.src, self.dest):
95 sa.Table(res[0], db.t.meta,
96 sa.Column('place_id', sa.BigInteger),
97 sa.Column('centroid', Geometry))
100 async def create_word_table(self) -> None:
101 """ Create the word table.
102 This table needs the property information to determine the
103 correct format. Therefore needs to be done after all other
104 data has been copied.
106 await make_query_analyzer(self.src)
107 await make_query_analyzer(self.dest)
108 src = self.src.t.meta.tables['word']
109 dest = self.dest.t.meta.tables['word']
111 await self.dest.connection.run_sync(dest.create)
113 LOG.warning("Copying word table")
114 async_result = await self.src.connection.stream(sa.select(src))
116 async for partition in async_result.partitions(10000):
117 data = [{k: getattr(r, k) for k in r._fields} for r in partition]
118 await self.dest.execute(dest.insert(), data)
120 await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
123 async def copy_data(self) -> None:
124 """ Copy data for all registered tables.
126 for table in self.dest.t.meta.sorted_tables:
127 LOG.warning("Copying '%s'", table.name)
128 async_result = await self.src.connection.stream(self.select_from(table.name))
130 async for partition in async_result.partitions(10000):
131 data = [{('class_' if k == 'class' else k): getattr(r, k) for k in r._fields}
133 await self.dest.execute(table.insert(), data)
135 # Set up a minimal copy of pg_tables used to look up the class tables later.
136 pg_tables = sa.Table('pg_tables', self.dest.t.meta,
137 sa.Column('schemaname', sa.Text, default='public'),
138 sa.Column('tablename', sa.Text))
139 await self.dest.connection.run_sync(pg_tables.create)
140 data = [{'tablename': t} for t in self.dest.t.meta.tables]
141 await self.dest.execute(pg_tables.insert().values(data))
144 async def create_indexes(self) -> None:
145 """ Add indexes necessary for the frontend.
147 # reverse place node lookup needs an extra table to simulate a
148 # partial index with adaptive buffering.
149 await self.dest.execute(sa.text(
150 """ CREATE TABLE placex_place_node_areas AS
151 SELECT place_id, ST_Expand(geometry,
152 14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
154 WHERE rank_address between 5 and 25
156 and linked_place_id is NULL """))
157 await self.dest.execute(sa.select(
158 sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
159 4326, 'GEOMETRY', 'XY')))
160 await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
161 'placex_place_node_areas', 'geometry')))
164 await self.create_spatial_index('country_grid', 'geometry')
165 await self.create_spatial_index('placex', 'geometry')
166 await self.create_spatial_index('osmline', 'linegeo')
167 await self.create_spatial_index('tiger', 'linegeo')
168 await self.create_index('placex', 'place_id')
169 await self.create_index('placex', 'parent_place_id')
170 await self.create_index('placex', 'rank_address')
171 await self.create_index('addressline', 'place_id')
172 await self.create_index('postcode', 'place_id')
173 await self.create_index('osmline', 'place_id')
174 await self.create_index('tiger', 'place_id')
176 if 'search' in self.options:
177 await self.create_spatial_index('postcode', 'geometry')
178 await self.create_spatial_index('search_name', 'centroid')
179 await self.create_index('search_name', 'place_id')
180 await self.create_index('osmline', 'parent_place_id')
181 await self.create_index('tiger', 'parent_place_id')
182 await self.create_search_index()
184 for t in self.dest.t.meta.tables:
185 if t.startswith('place_classtype_'):
186 await self.dest.execute(sa.select(
187 sa.func.CreateSpatialIndex(t, 'centroid')))
190 async def create_spatial_index(self, table: str, column: str) -> None:
191 """ Create a spatial index on the given table and column.
193 await self.dest.execute(sa.select(
194 sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
197 async def create_index(self, table_name: str, column: str) -> None:
198 """ Create a simple index on the given table and column.
200 table = getattr(self.dest.t, table_name)
201 await self.dest.connection.run_sync(
202 sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
205 async def create_search_index(self) -> None:
206 """ Create the tables and indexes needed for word lookup.
208 tsrc = self.src.t.search_name
209 for column in ('name_vector', 'nameaddress_vector'):
210 table_name = f'reverse_search_{column}'
211 LOG.warning("Creating reverse search %s", table_name)
212 rsn = sa.Table(table_name, self.dest.t.meta,
213 sa.Column('word', sa.Integer()),
214 sa.Column('places', IntArray))
215 await self.dest.connection.run_sync(rsn.create)
217 sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
218 sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
221 async_result = await self.src.connection.stream(sql)
222 async for partition in async_result.partitions(100):
224 for row in partition:
226 data.append({'word': row.word,
227 'places': row.places})
228 await self.dest.execute(rsn.insert(), data)
230 await self.dest.connection.run_sync(
231 sa.Index(f'idx_reverse_search_{column}_word', rsn.c.word).create)
234 def select_from(self, table: str) -> SaSelect:
235 """ Create the SQL statement to select the source columns and rows.
237 columns = self.src.t.meta.tables[table].c
239 if table == 'placex':
240 # SQLite struggles with Geometries that are larger than 5MB,
242 return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
243 sa.func.ST_AsText(columns.centroid).label('centroid'),
245 sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
247 else_=sa.func.ST_SimplifyPreserveTopology(
248 columns.geometry, 0.0001)
249 )).label('geometry'))
251 sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
252 if isinstance(c.type, Geometry) else c for c in columns))