]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/convert_sqlite.py
work round typing bug in pyosmium 4.0
[nominatim.git] / src / nominatim_db / tools / convert_sqlite.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Exporting a Nominatim database to SQlite.
9 """
10 from typing import Set, Any, Optional, Union
11 import datetime as dt
12 import logging
13 from pathlib import Path
14
15 import sqlalchemy as sa
16
17 import nominatim_api as napi
18 from nominatim_api.search.query_analyzer_factory import make_query_analyzer
19 from nominatim_api.typing import SaSelect, SaRow
20 from nominatim_api.sql.sqlalchemy_types import Geometry, IntArray
21
22 LOG = logging.getLogger()
23
24 async def convert(project_dir: Optional[Union[str, Path]],
25                   outfile: Path, options: Set[str]) -> None:
26     """ Export an existing database to sqlite. The resulting database
27         will be usable against the Python frontend of Nominatim.
28     """
29     api = napi.NominatimAPIAsync(project_dir)
30
31     try:
32         outapi = napi.NominatimAPIAsync(project_dir,
33                                         {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}",
34                                          'NOMINATIM_DATABASE_RW': '1'})
35
36         try:
37             async with api.begin() as src, outapi.begin() as dest:
38                 writer = SqliteWriter(src, dest, options)
39                 await writer.write()
40         finally:
41             await outapi.close()
42     finally:
43         await api.close()
44
45
46 class SqliteWriter:
47     """ Worker class which creates a new SQLite database.
48     """
49
50     def __init__(self, src: napi.SearchConnection,
51                  dest: napi.SearchConnection, options: Set[str]) -> None:
52         self.src = src
53         self.dest = dest
54         self.options = options
55
56
57     async def write(self) -> None:
58         """ Create the database structure and copy the data from
59             the source database to the destination.
60         """
61         LOG.warning('Setting up spatialite')
62         await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
63
64         await self.create_tables()
65         await self.copy_data()
66         if 'search' in self.options:
67             await self.create_word_table()
68         await self.create_indexes()
69
70
71     async def create_tables(self) -> None:
72         """ Set up the database tables.
73         """
74         LOG.warning('Setting up tables')
75         if 'search' not in self.options:
76             self.dest.t.meta.remove(self.dest.t.search_name)
77         else:
78             await self.create_class_tables()
79
80         await self.dest.connection.run_sync(self.dest.t.meta.create_all)
81
82         # Convert all Geometry columns to Spatialite geometries
83         for table in self.dest.t.meta.sorted_tables:
84             for col in table.c:
85                 if isinstance(col.type, Geometry):
86                     await self.dest.execute(sa.select(
87                         sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
88                                                       col.type.subtype.upper(), 'XY')))
89
90
91     async def create_class_tables(self) -> None:
92         """ Set up the table that serve class/type-specific geometries.
93         """
94         sql = sa.text("""SELECT tablename FROM pg_tables
95                          WHERE tablename LIKE 'place_classtype_%'""")
96         for res in await self.src.execute(sql):
97             for db in (self.src, self.dest):
98                 sa.Table(res[0], db.t.meta,
99                          sa.Column('place_id', sa.BigInteger),
100                          sa.Column('centroid', Geometry))
101
102
103     async def create_word_table(self) -> None:
104         """ Create the word table.
105             This table needs the property information to determine the
106             correct format. Therefore needs to be done after all other
107             data has been copied.
108         """
109         await make_query_analyzer(self.src)
110         await make_query_analyzer(self.dest)
111         src = self.src.t.meta.tables['word']
112         dest = self.dest.t.meta.tables['word']
113
114         await self.dest.connection.run_sync(dest.create)
115
116         LOG.warning("Copying word table")
117         async_result = await self.src.connection.stream(sa.select(src))
118
119         async for partition in async_result.partitions(10000):
120             data = [{k: getattr(r, k) for k in r._fields} for r in partition]
121             await self.dest.execute(dest.insert(), data)
122
123         await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
124
125
126     async def copy_data(self) -> None:
127         """ Copy data for all registered tables.
128         """
129         def _getfield(row: SaRow, key: str) -> Any:
130             value = getattr(row, key)
131             if isinstance(value, dt.datetime):
132                 if value.tzinfo is not None:
133                     value = value.astimezone(dt.timezone.utc)
134             return value
135
136         for table in self.dest.t.meta.sorted_tables:
137             LOG.warning("Copying '%s'", table.name)
138             async_result = await self.src.connection.stream(self.select_from(table.name))
139
140             async for partition in async_result.partitions(10000):
141                 data = [{('class_' if k == 'class' else k): _getfield(r, k)
142                          for k in r._fields}
143                         for r in partition]
144                 await self.dest.execute(table.insert(), data)
145
146         # Set up a minimal copy of pg_tables used to look up the class tables later.
147         pg_tables = sa.Table('pg_tables', self.dest.t.meta,
148                              sa.Column('schemaname', sa.Text, default='public'),
149                              sa.Column('tablename', sa.Text))
150         await self.dest.connection.run_sync(pg_tables.create)
151         data = [{'tablename': t} for t in self.dest.t.meta.tables]
152         await self.dest.execute(pg_tables.insert().values(data))
153
154
155     async def create_indexes(self) -> None:
156         """ Add indexes necessary for the frontend.
157         """
158         # reverse place node lookup needs an extra table to simulate a
159         # partial index with adaptive buffering.
160         await self.dest.execute(sa.text(
161             """ CREATE TABLE placex_place_node_areas AS
162                   SELECT place_id, ST_Expand(geometry,
163                                              14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
164                   FROM placex
165                   WHERE rank_address between 5 and 25
166                         and osm_type = 'N'
167                         and linked_place_id is NULL """))
168         await self.dest.execute(sa.select(
169             sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
170                                           4326, 'GEOMETRY', 'XY')))
171         await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
172                                              'placex_place_node_areas', 'geometry')))
173
174         # Remaining indexes.
175         await self.create_spatial_index('country_grid', 'geometry')
176         await self.create_spatial_index('placex', 'geometry')
177         await self.create_spatial_index('osmline', 'linegeo')
178         await self.create_spatial_index('tiger', 'linegeo')
179         await self.create_index('placex', 'place_id')
180         await self.create_index('placex', 'parent_place_id')
181         await self.create_index('placex', 'rank_address')
182         await self.create_index('addressline', 'place_id')
183         await self.create_index('postcode', 'place_id')
184         await self.create_index('osmline', 'place_id')
185         await self.create_index('tiger', 'place_id')
186
187         if 'search' in self.options:
188             await self.create_spatial_index('postcode', 'geometry')
189             await self.create_spatial_index('search_name', 'centroid')
190             await self.create_index('search_name', 'place_id')
191             await self.create_index('osmline', 'parent_place_id')
192             await self.create_index('tiger', 'parent_place_id')
193             await self.create_search_index()
194
195             for t in self.dest.t.meta.tables:
196                 if t.startswith('place_classtype_'):
197                     await self.dest.execute(sa.select(
198                       sa.func.CreateSpatialIndex(t, 'centroid')))
199
200
201     async def create_spatial_index(self, table: str, column: str) -> None:
202         """ Create a spatial index on the given table and column.
203         """
204         await self.dest.execute(sa.select(
205                   sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
206
207
208     async def create_index(self, table_name: str, column: str) -> None:
209         """ Create a simple index on the given table and column.
210         """
211         table = getattr(self.dest.t, table_name)
212         await self.dest.connection.run_sync(
213             sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
214
215
216     async def create_search_index(self) -> None:
217         """ Create the tables and indexes needed for word lookup.
218         """
219         LOG.warning("Creating reverse search table")
220         rsn = sa.Table('reverse_search_name', self.dest.t.meta,
221                        sa.Column('word', sa.Integer()),
222                        sa.Column('column', sa.Text()),
223                        sa.Column('places', IntArray))
224         await self.dest.connection.run_sync(rsn.create)
225
226         tsrc = self.src.t.search_name
227         for column in ('name_vector', 'nameaddress_vector'):
228             sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
229                             sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
230                     .group_by('word')
231
232             async_result = await self.src.connection.stream(sql)
233             async for partition in async_result.partitions(100):
234                 data = []
235                 for row in partition:
236                     row.places.sort()
237                     data.append({'word': row.word,
238                                  'column': column,
239                                  'places': row.places})
240                 await self.dest.execute(rsn.insert(), data)
241
242         await self.dest.connection.run_sync(
243             sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
244
245
246     def select_from(self, table: str) -> SaSelect:
247         """ Create the SQL statement to select the source columns and rows.
248         """
249         columns = self.src.t.meta.tables[table].c
250
251         if table == 'placex':
252             # SQLite struggles with Geometries that are larger than 5MB,
253             # so simplify those.
254             return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
255                              sa.func.ST_AsText(columns.centroid).label('centroid'),
256                              sa.func.ST_AsText(
257                                sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
258                                         columns.geometry),
259                                        else_=sa.func.ST_SimplifyPreserveTopology(
260                                                 columns.geometry, 0.0001)
261                                 )).label('geometry'))
262
263         sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
264                              if isinstance(c.type, Geometry) else c for c in columns))
265
266         return sql