]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/convert_sqlite.py
Merge pull request #3582 from lonvia/switch-to-flake
[nominatim.git] / src / nominatim_db / tools / convert_sqlite.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Exporting a Nominatim database to SQlite.
9 """
10 from typing import Set, Any, Optional, Union
11 import datetime as dt
12 import logging
13 from pathlib import Path
14
15 import sqlalchemy as sa
16
17 import nominatim_api as napi
18 from nominatim_api.search.query_analyzer_factory import make_query_analyzer
19 from nominatim_api.typing import SaSelect, SaRow
20 from nominatim_api.sql.sqlalchemy_types import Geometry, IntArray
21
22 LOG = logging.getLogger()
23
24
25 async def convert(project_dir: Optional[Union[str, Path]],
26                   outfile: Path, options: Set[str]) -> None:
27     """ Export an existing database to sqlite. The resulting database
28         will be usable against the Python frontend of Nominatim.
29     """
30     api = napi.NominatimAPIAsync(project_dir)
31
32     try:
33         outapi = napi.NominatimAPIAsync(project_dir,
34                                         {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}",
35                                          'NOMINATIM_DATABASE_RW': '1'})
36
37         try:
38             async with api.begin() as src, outapi.begin() as dest:
39                 writer = SqliteWriter(src, dest, options)
40                 await writer.write()
41         finally:
42             await outapi.close()
43     finally:
44         await api.close()
45
46
47 class SqliteWriter:
48     """ Worker class which creates a new SQLite database.
49     """
50
51     def __init__(self, src: napi.SearchConnection,
52                  dest: napi.SearchConnection, options: Set[str]) -> None:
53         self.src = src
54         self.dest = dest
55         self.options = options
56
57     async def write(self) -> None:
58         """ Create the database structure and copy the data from
59             the source database to the destination.
60         """
61         LOG.warning('Setting up spatialite')
62         await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
63
64         await self.create_tables()
65         await self.copy_data()
66         if 'search' in self.options:
67             await self.create_word_table()
68         await self.create_indexes()
69
70     async def create_tables(self) -> None:
71         """ Set up the database tables.
72         """
73         LOG.warning('Setting up tables')
74         if 'search' not in self.options:
75             self.dest.t.meta.remove(self.dest.t.search_name)
76         else:
77             await self.create_class_tables()
78
79         await self.dest.connection.run_sync(self.dest.t.meta.create_all)
80
81         # Convert all Geometry columns to Spatialite geometries
82         for table in self.dest.t.meta.sorted_tables:
83             for col in table.c:
84                 if isinstance(col.type, Geometry):
85                     await self.dest.execute(sa.select(
86                         sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
87                                                       col.type.subtype.upper(), 'XY')))
88
89     async def create_class_tables(self) -> None:
90         """ Set up the table that serve class/type-specific geometries.
91         """
92         sql = sa.text("""SELECT tablename FROM pg_tables
93                          WHERE tablename LIKE 'place_classtype_%'""")
94         for res in await self.src.execute(sql):
95             for db in (self.src, self.dest):
96                 sa.Table(res[0], db.t.meta,
97                          sa.Column('place_id', sa.BigInteger),
98                          sa.Column('centroid', Geometry))
99
100     async def create_word_table(self) -> None:
101         """ Create the word table.
102             This table needs the property information to determine the
103             correct format. Therefore needs to be done after all other
104             data has been copied.
105         """
106         await make_query_analyzer(self.src)
107         await make_query_analyzer(self.dest)
108         src = self.src.t.meta.tables['word']
109         dest = self.dest.t.meta.tables['word']
110
111         await self.dest.connection.run_sync(dest.create)
112
113         LOG.warning("Copying word table")
114         async_result = await self.src.connection.stream(sa.select(src))
115
116         async for partition in async_result.partitions(10000):
117             data = [{k: getattr(r, k) for k in r._fields} for r in partition]
118             await self.dest.execute(dest.insert(), data)
119
120         await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
121
122     async def copy_data(self) -> None:
123         """ Copy data for all registered tables.
124         """
125         def _getfield(row: SaRow, key: str) -> Any:
126             value = getattr(row, key)
127             if isinstance(value, dt.datetime):
128                 if value.tzinfo is not None:
129                     value = value.astimezone(dt.timezone.utc)
130             return value
131
132         for table in self.dest.t.meta.sorted_tables:
133             LOG.warning("Copying '%s'", table.name)
134             async_result = await self.src.connection.stream(self.select_from(table.name))
135
136             async for partition in async_result.partitions(10000):
137                 data = [{('class_' if k == 'class' else k): _getfield(r, k)
138                          for k in r._fields}
139                         for r in partition]
140                 await self.dest.execute(table.insert(), data)
141
142         # Set up a minimal copy of pg_tables used to look up the class tables later.
143         pg_tables = sa.Table('pg_tables', self.dest.t.meta,
144                              sa.Column('schemaname', sa.Text, default='public'),
145                              sa.Column('tablename', sa.Text))
146         await self.dest.connection.run_sync(pg_tables.create)
147         data = [{'tablename': t} for t in self.dest.t.meta.tables]
148         await self.dest.execute(pg_tables.insert().values(data))
149
150     async def create_indexes(self) -> None:
151         """ Add indexes necessary for the frontend.
152         """
153         # reverse place node lookup needs an extra table to simulate a
154         # partial index with adaptive buffering.
155         await self.dest.execute(sa.text(
156             """ CREATE TABLE placex_place_node_areas AS
157                   SELECT place_id, ST_Expand(geometry,
158                                              14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
159                   FROM placex
160                   WHERE rank_address between 5 and 25
161                         and osm_type = 'N'
162                         and linked_place_id is NULL """))
163         await self.dest.execute(sa.select(
164             sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
165                                           4326, 'GEOMETRY', 'XY')))
166         await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
167                                              'placex_place_node_areas', 'geometry')))
168
169         # Remaining indexes.
170         await self.create_spatial_index('country_grid', 'geometry')
171         await self.create_spatial_index('placex', 'geometry')
172         await self.create_spatial_index('osmline', 'linegeo')
173         await self.create_spatial_index('tiger', 'linegeo')
174         await self.create_index('placex', 'place_id')
175         await self.create_index('placex', 'parent_place_id')
176         await self.create_index('placex', 'rank_address')
177         await self.create_index('addressline', 'place_id')
178         await self.create_index('postcode', 'place_id')
179         await self.create_index('osmline', 'place_id')
180         await self.create_index('tiger', 'place_id')
181
182         if 'search' in self.options:
183             await self.create_spatial_index('postcode', 'geometry')
184             await self.create_spatial_index('search_name', 'centroid')
185             await self.create_index('search_name', 'place_id')
186             await self.create_index('osmline', 'parent_place_id')
187             await self.create_index('tiger', 'parent_place_id')
188             await self.create_search_index()
189
190             for t in self.dest.t.meta.tables:
191                 if t.startswith('place_classtype_'):
192                     await self.dest.execute(sa.select(
193                       sa.func.CreateSpatialIndex(t, 'centroid')))
194
195     async def create_spatial_index(self, table: str, column: str) -> None:
196         """ Create a spatial index on the given table and column.
197         """
198         await self.dest.execute(sa.select(
199                   sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
200
201     async def create_index(self, table_name: str, column: str) -> None:
202         """ Create a simple index on the given table and column.
203         """
204         table = getattr(self.dest.t, table_name)
205         await self.dest.connection.run_sync(
206             sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
207
208     async def create_search_index(self) -> None:
209         """ Create the tables and indexes needed for word lookup.
210         """
211         LOG.warning("Creating reverse search table")
212         rsn = sa.Table('reverse_search_name', self.dest.t.meta,
213                        sa.Column('word', sa.Integer()),
214                        sa.Column('column', sa.Text()),
215                        sa.Column('places', IntArray))
216         await self.dest.connection.run_sync(rsn.create)
217
218         tsrc = self.src.t.search_name
219         for column in ('name_vector', 'nameaddress_vector'):
220             sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
221                             sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
222                     .group_by('word')
223
224             async_result = await self.src.connection.stream(sql)
225             async for partition in async_result.partitions(100):
226                 data = []
227                 for row in partition:
228                     row.places.sort()
229                     data.append({'word': row.word,
230                                  'column': column,
231                                  'places': row.places})
232                 await self.dest.execute(rsn.insert(), data)
233
234         await self.dest.connection.run_sync(
235             sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
236
237     def select_from(self, table: str) -> SaSelect:
238         """ Create the SQL statement to select the source columns and rows.
239         """
240         columns = self.src.t.meta.tables[table].c
241
242         if table == 'placex':
243             # SQLite struggles with Geometries that are larger than 5MB,
244             # so simplify those.
245             return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
246                              sa.func.ST_AsText(columns.centroid).label('centroid'),
247                              sa.func.ST_AsText(
248                                sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
249                                         columns.geometry),
250                                        else_=sa.func.ST_SimplifyPreserveTopology(
251                                                 columns.geometry, 0.0001)
252                                        )).label('geometry'))
253
254         sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
255                         if isinstance(c.type, Geometry) else c for c in columns))
256
257         return sql