]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/convert_sqlite.py
Merge pull request #3487 from lonvia/port-to-psycopg3
[nominatim.git] / src / nominatim_db / tools / convert_sqlite.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Exporting a Nominatim database to SQlite.
9 """
10 from typing import Set, Any
11 import datetime as dt
12 import logging
13 from pathlib import Path
14
15 import sqlalchemy as sa
16
17 import nominatim_api as napi
18 from nominatim_api.search.query_analyzer_factory import make_query_analyzer
19 from nominatim_api.typing import SaSelect, SaRow
20 from nominatim_api.sql.sqlalchemy_types import Geometry, IntArray
21
22 LOG = logging.getLogger()
23
24 async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
25     """ Export an existing database to sqlite. The resulting database
26         will be usable against the Python frontend of Nominatim.
27     """
28     api = napi.NominatimAPIAsync(project_dir)
29
30     try:
31         outapi = napi.NominatimAPIAsync(project_dir,
32                                         {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}",
33                                          'NOMINATIM_DATABASE_RW': '1'})
34
35         try:
36             async with api.begin() as src, outapi.begin() as dest:
37                 writer = SqliteWriter(src, dest, options)
38                 await writer.write()
39         finally:
40             await outapi.close()
41     finally:
42         await api.close()
43
44
45 class SqliteWriter:
46     """ Worker class which creates a new SQLite database.
47     """
48
49     def __init__(self, src: napi.SearchConnection,
50                  dest: napi.SearchConnection, options: Set[str]) -> None:
51         self.src = src
52         self.dest = dest
53         self.options = options
54
55
56     async def write(self) -> None:
57         """ Create the database structure and copy the data from
58             the source database to the destination.
59         """
60         LOG.warning('Setting up spatialite')
61         await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
62
63         await self.create_tables()
64         await self.copy_data()
65         if 'search' in self.options:
66             await self.create_word_table()
67         await self.create_indexes()
68
69
70     async def create_tables(self) -> None:
71         """ Set up the database tables.
72         """
73         LOG.warning('Setting up tables')
74         if 'search' not in self.options:
75             self.dest.t.meta.remove(self.dest.t.search_name)
76         else:
77             await self.create_class_tables()
78
79         await self.dest.connection.run_sync(self.dest.t.meta.create_all)
80
81         # Convert all Geometry columns to Spatialite geometries
82         for table in self.dest.t.meta.sorted_tables:
83             for col in table.c:
84                 if isinstance(col.type, Geometry):
85                     await self.dest.execute(sa.select(
86                         sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
87                                                       col.type.subtype.upper(), 'XY')))
88
89
90     async def create_class_tables(self) -> None:
91         """ Set up the table that serve class/type-specific geometries.
92         """
93         sql = sa.text("""SELECT tablename FROM pg_tables
94                          WHERE tablename LIKE 'place_classtype_%'""")
95         for res in await self.src.execute(sql):
96             for db in (self.src, self.dest):
97                 sa.Table(res[0], db.t.meta,
98                          sa.Column('place_id', sa.BigInteger),
99                          sa.Column('centroid', Geometry))
100
101
102     async def create_word_table(self) -> None:
103         """ Create the word table.
104             This table needs the property information to determine the
105             correct format. Therefore needs to be done after all other
106             data has been copied.
107         """
108         await make_query_analyzer(self.src)
109         await make_query_analyzer(self.dest)
110         src = self.src.t.meta.tables['word']
111         dest = self.dest.t.meta.tables['word']
112
113         await self.dest.connection.run_sync(dest.create)
114
115         LOG.warning("Copying word table")
116         async_result = await self.src.connection.stream(sa.select(src))
117
118         async for partition in async_result.partitions(10000):
119             data = [{k: getattr(r, k) for k in r._fields} for r in partition]
120             await self.dest.execute(dest.insert(), data)
121
122         await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
123
124
125     async def copy_data(self) -> None:
126         """ Copy data for all registered tables.
127         """
128         def _getfield(row: SaRow, key: str) -> Any:
129             value = getattr(row, key)
130             if isinstance(value, dt.datetime):
131                 if value.tzinfo is not None:
132                     value = value.astimezone(dt.timezone.utc)
133             return value
134
135         for table in self.dest.t.meta.sorted_tables:
136             LOG.warning("Copying '%s'", table.name)
137             async_result = await self.src.connection.stream(self.select_from(table.name))
138
139             async for partition in async_result.partitions(10000):
140                 data = [{('class_' if k == 'class' else k): _getfield(r, k)
141                          for k in r._fields}
142                         for r in partition]
143                 await self.dest.execute(table.insert(), data)
144
145         # Set up a minimal copy of pg_tables used to look up the class tables later.
146         pg_tables = sa.Table('pg_tables', self.dest.t.meta,
147                              sa.Column('schemaname', sa.Text, default='public'),
148                              sa.Column('tablename', sa.Text))
149         await self.dest.connection.run_sync(pg_tables.create)
150         data = [{'tablename': t} for t in self.dest.t.meta.tables]
151         await self.dest.execute(pg_tables.insert().values(data))
152
153
154     async def create_indexes(self) -> None:
155         """ Add indexes necessary for the frontend.
156         """
157         # reverse place node lookup needs an extra table to simulate a
158         # partial index with adaptive buffering.
159         await self.dest.execute(sa.text(
160             """ CREATE TABLE placex_place_node_areas AS
161                   SELECT place_id, ST_Expand(geometry,
162                                              14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
163                   FROM placex
164                   WHERE rank_address between 5 and 25
165                         and osm_type = 'N'
166                         and linked_place_id is NULL """))
167         await self.dest.execute(sa.select(
168             sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
169                                           4326, 'GEOMETRY', 'XY')))
170         await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
171                                              'placex_place_node_areas', 'geometry')))
172
173         # Remaining indexes.
174         await self.create_spatial_index('country_grid', 'geometry')
175         await self.create_spatial_index('placex', 'geometry')
176         await self.create_spatial_index('osmline', 'linegeo')
177         await self.create_spatial_index('tiger', 'linegeo')
178         await self.create_index('placex', 'place_id')
179         await self.create_index('placex', 'parent_place_id')
180         await self.create_index('placex', 'rank_address')
181         await self.create_index('addressline', 'place_id')
182         await self.create_index('postcode', 'place_id')
183         await self.create_index('osmline', 'place_id')
184         await self.create_index('tiger', 'place_id')
185
186         if 'search' in self.options:
187             await self.create_spatial_index('postcode', 'geometry')
188             await self.create_spatial_index('search_name', 'centroid')
189             await self.create_index('search_name', 'place_id')
190             await self.create_index('osmline', 'parent_place_id')
191             await self.create_index('tiger', 'parent_place_id')
192             await self.create_search_index()
193
194             for t in self.dest.t.meta.tables:
195                 if t.startswith('place_classtype_'):
196                     await self.dest.execute(sa.select(
197                       sa.func.CreateSpatialIndex(t, 'centroid')))
198
199
200     async def create_spatial_index(self, table: str, column: str) -> None:
201         """ Create a spatial index on the given table and column.
202         """
203         await self.dest.execute(sa.select(
204                   sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
205
206
207     async def create_index(self, table_name: str, column: str) -> None:
208         """ Create a simple index on the given table and column.
209         """
210         table = getattr(self.dest.t, table_name)
211         await self.dest.connection.run_sync(
212             sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
213
214
215     async def create_search_index(self) -> None:
216         """ Create the tables and indexes needed for word lookup.
217         """
218         LOG.warning("Creating reverse search table")
219         rsn = sa.Table('reverse_search_name', self.dest.t.meta,
220                        sa.Column('word', sa.Integer()),
221                        sa.Column('column', sa.Text()),
222                        sa.Column('places', IntArray))
223         await self.dest.connection.run_sync(rsn.create)
224
225         tsrc = self.src.t.search_name
226         for column in ('name_vector', 'nameaddress_vector'):
227             sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
228                             sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
229                     .group_by('word')
230
231             async_result = await self.src.connection.stream(sql)
232             async for partition in async_result.partitions(100):
233                 data = []
234                 for row in partition:
235                     row.places.sort()
236                     data.append({'word': row.word,
237                                  'column': column,
238                                  'places': row.places})
239                 await self.dest.execute(rsn.insert(), data)
240
241         await self.dest.connection.run_sync(
242             sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
243
244
245     def select_from(self, table: str) -> SaSelect:
246         """ Create the SQL statement to select the source columns and rows.
247         """
248         columns = self.src.t.meta.tables[table].c
249
250         if table == 'placex':
251             # SQLite struggles with Geometries that are larger than 5MB,
252             # so simplify those.
253             return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
254                              sa.func.ST_AsText(columns.centroid).label('centroid'),
255                              sa.func.ST_AsText(
256                                sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
257                                         columns.geometry),
258                                        else_=sa.func.ST_SimplifyPreserveTopology(
259                                                 columns.geometry, 0.0001)
260                                 )).label('geometry'))
261
262         sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
263                              if isinstance(c.type, Geometry) else c for c in columns))
264
265         return sql