]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/convert_sqlite.py
extend sqlite converter for search tables
[nominatim.git] / nominatim / tools / convert_sqlite.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Exporting a Nominatim database to SQlite.
9 """
10 from typing import Set
11 import logging
12 from pathlib import Path
13
14 import sqlalchemy as sa
15
16 from nominatim.typing import SaSelect
17 from nominatim.db.sqlalchemy_types import Geometry, IntArray
18 from nominatim.api.search.query_analyzer_factory import make_query_analyzer
19 import nominatim.api as napi
20
21 LOG = logging.getLogger()
22
23 async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
24     """ Export an existing database to sqlite. The resulting database
25         will be usable against the Python frontend of Nominatim.
26     """
27     api = napi.NominatimAPIAsync(project_dir)
28
29     try:
30         outapi = napi.NominatimAPIAsync(project_dir,
31                                         {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}"})
32
33         try:
34             async with api.begin() as src, outapi.begin() as dest:
35                 writer = SqliteWriter(src, dest, options)
36                 await writer.write()
37         finally:
38             await outapi.close()
39     finally:
40         await api.close()
41
42
43 class SqliteWriter:
44     """ Worker class which creates a new SQLite database.
45     """
46
47     def __init__(self, src: napi.SearchConnection,
48                  dest: napi.SearchConnection, options: Set[str]) -> None:
49         self.src = src
50         self.dest = dest
51         self.options = options
52
53
54     async def write(self) -> None:
55         """ Create the database structure and copy the data from
56             the source database to the destination.
57         """
58         LOG.warning('Setting up spatialite')
59         await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
60
61         await self.create_tables()
62         await self.copy_data()
63         if 'search' in self.options:
64             await self.create_word_table()
65         await self.create_indexes()
66
67
68     async def create_tables(self) -> None:
69         """ Set up the database tables.
70         """
71         LOG.warning('Setting up tables')
72         if 'search' not in self.options:
73             self.dest.t.meta.remove(self.dest.t.search_name)
74         else:
75             await self.create_class_tables()
76
77         await self.dest.connection.run_sync(self.dest.t.meta.create_all)
78
79         # Convert all Geometry columns to Spatialite geometries
80         for table in self.dest.t.meta.sorted_tables:
81             for col in table.c:
82                 if isinstance(col.type, Geometry):
83                     await self.dest.execute(sa.select(
84                         sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
85                                                       col.type.subtype.upper(), 'XY')))
86
87
88     async def create_class_tables(self) -> None:
89         """ Set up the table that serve class/type-specific geometries.
90         """
91         sql = sa.text("""SELECT tablename FROM pg_tables
92                          WHERE tablename LIKE 'place_classtype_%'""")
93         for res in await self.src.execute(sql):
94             for db in (self.src, self.dest):
95                 sa.Table(res[0], db.t.meta,
96                          sa.Column('place_id', sa.BigInteger),
97                          sa.Column('centroid', Geometry))
98
99
100     async def create_word_table(self) -> None:
101         """ Create the word table.
102             This table needs the property information to determine the
103             correct format. Therefore needs to be done after all other
104             data has been copied.
105         """
106         await make_query_analyzer(self.src)
107         await make_query_analyzer(self.dest)
108         src = self.src.t.meta.tables['word']
109         dest = self.dest.t.meta.tables['word']
110
111         await self.dest.connection.run_sync(dest.create)
112
113         LOG.warning("Copying word table")
114         async_result = await self.src.connection.stream(sa.select(src))
115
116         async for partition in async_result.partitions(10000):
117             data = [{k: getattr(r, k) for k in r._fields} for r in partition]
118             await self.dest.execute(dest.insert(), data)
119
120         await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
121
122
123     async def copy_data(self) -> None:
124         """ Copy data for all registered tables.
125         """
126         for table in self.dest.t.meta.sorted_tables:
127             LOG.warning("Copying '%s'", table.name)
128             async_result = await self.src.connection.stream(self.select_from(table.name))
129
130             async for partition in async_result.partitions(10000):
131                 data = [{('class_' if k == 'class' else k): getattr(r, k) for k in r._fields}
132                         for r in partition]
133                 await self.dest.execute(table.insert(), data)
134
135         # Set up a minimal copy of pg_tables used to look up the class tables later.
136         pg_tables = sa.Table('pg_tables', self.dest.t.meta,
137                              sa.Column('schemaname', sa.Text, default='public'),
138                              sa.Column('tablename', sa.Text))
139         await self.dest.connection.run_sync(pg_tables.create)
140         data = [{'tablename': t} for t in self.dest.t.meta.tables]
141         await self.dest.execute(pg_tables.insert().values(data))
142
143
144     async def create_indexes(self) -> None:
145         """ Add indexes necessary for the frontend.
146         """
147         # reverse place node lookup needs an extra table to simulate a
148         # partial index with adaptive buffering.
149         await self.dest.execute(sa.text(
150             """ CREATE TABLE placex_place_node_areas AS
151                   SELECT place_id, ST_Expand(geometry,
152                                              14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
153                   FROM placex
154                   WHERE rank_address between 5 and 25
155                         and osm_type = 'N'
156                         and linked_place_id is NULL """))
157         await self.dest.execute(sa.select(
158             sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
159                                           4326, 'GEOMETRY', 'XY')))
160         await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
161                                              'placex_place_node_areas', 'geometry')))
162
163         # Remaining indexes.
164         await self.create_spatial_index('country_grid', 'geometry')
165         await self.create_spatial_index('placex', 'geometry')
166         await self.create_spatial_index('osmline', 'linegeo')
167         await self.create_spatial_index('tiger', 'linegeo')
168         await self.create_index('placex', 'place_id')
169         await self.create_index('placex', 'parent_place_id')
170         await self.create_index('placex', 'rank_address')
171         await self.create_index('addressline', 'place_id')
172         await self.create_index('postcode', 'place_id')
173         await self.create_index('osmline', 'place_id')
174         await self.create_index('tiger', 'place_id')
175
176         if 'search' in self.options:
177             await self.create_spatial_index('postcode', 'geometry')
178             await self.create_spatial_index('search_name', 'centroid')
179             await self.create_index('search_name', 'place_id')
180             await self.create_index('osmline', 'parent_place_id')
181             await self.create_index('tiger', 'parent_place_id')
182             await self.create_search_index()
183
184             for t in self.dest.t.meta.tables:
185                 if t.startswith('place_classtype_'):
186                     await self.dest.execute(sa.select(
187                       sa.func.CreateSpatialIndex(t, 'centroid')))
188
189
190     async def create_spatial_index(self, table: str, column: str) -> None:
191         """ Create a spatial index on the given table and column.
192         """
193         await self.dest.execute(sa.select(
194                   sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
195
196
197     async def create_index(self, table_name: str, column: str) -> None:
198         """ Create a simple index on the given table and column.
199         """
200         table = getattr(self.dest.t, table_name)
201         await self.dest.connection.run_sync(
202             sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
203
204
205     async def create_search_index(self) -> None:
206         """ Create the tables and indexes needed for word lookup.
207         """
208         tsrc = self.src.t.search_name
209         for column in ('name_vector', 'nameaddress_vector'):
210             table_name = f'reverse_search_{column}'
211             LOG.warning("Creating reverse search %s", table_name)
212             rsn = sa.Table(table_name, self.dest.t.meta,
213                            sa.Column('word', sa.Integer()),
214                            sa.Column('places', IntArray))
215             await self.dest.connection.run_sync(rsn.create)
216
217             sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
218                             sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
219                     .group_by('word')
220
221             async_result = await self.src.connection.stream(sql)
222             async for partition in async_result.partitions(100):
223                 data = []
224                 for row in partition:
225                     row.places.sort()
226                     data.append({'word': row.word,
227                                  'places': row.places})
228                 await self.dest.execute(rsn.insert(), data)
229
230             await self.dest.connection.run_sync(
231                 sa.Index(f'idx_reverse_search_{column}_word', rsn.c.word).create)
232
233
234     def select_from(self, table: str) -> SaSelect:
235         """ Create the SQL statement to select the source columns and rows.
236         """
237         columns = self.src.t.meta.tables[table].c
238
239         if table == 'placex':
240             # SQLite struggles with Geometries that are larger than 5MB,
241             # so simplify those.
242             return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
243                              sa.func.ST_AsText(columns.centroid).label('centroid'),
244                              sa.func.ST_AsText(
245                                sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
246                                         columns.geometry),
247                                        else_=sa.func.ST_SimplifyPreserveTopology(
248                                                 columns.geometry, 0.0001)
249                                 )).label('geometry'))
250
251         sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
252                              if isinstance(c.type, Geometry) else c for c in columns))
253
254         return sql