]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/convert_sqlite.py
use correct SQLAlchemy pool for asynchronous connections
[nominatim.git] / nominatim / tools / convert_sqlite.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Exporting a Nominatim database to SQlite.
9 """
10 from typing import Set
11 import logging
12 from pathlib import Path
13
14 import sqlalchemy as sa
15
16 from nominatim.typing import SaSelect
17 from nominatim.db.sqlalchemy_types import Geometry, IntArray
18 from nominatim.api.search.query_analyzer_factory import make_query_analyzer
19 import nominatim.api as napi
20
21 LOG = logging.getLogger()
22
23 async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
24     """ Export an existing database to sqlite. The resulting database
25         will be usable against the Python frontend of Nominatim.
26     """
27     api = napi.NominatimAPIAsync(project_dir)
28
29     try:
30         outapi = napi.NominatimAPIAsync(project_dir,
31                                         {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}",
32                                          'NOMINATIM_DATABASE_RW': '1'})
33
34         try:
35             async with api.begin() as src, outapi.begin() as dest:
36                 writer = SqliteWriter(src, dest, options)
37                 await writer.write()
38         finally:
39             await outapi.close()
40     finally:
41         await api.close()
42
43
44 class SqliteWriter:
45     """ Worker class which creates a new SQLite database.
46     """
47
48     def __init__(self, src: napi.SearchConnection,
49                  dest: napi.SearchConnection, options: Set[str]) -> None:
50         self.src = src
51         self.dest = dest
52         self.options = options
53
54
55     async def write(self) -> None:
56         """ Create the database structure and copy the data from
57             the source database to the destination.
58         """
59         LOG.warning('Setting up spatialite')
60         await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
61
62         await self.create_tables()
63         await self.copy_data()
64         if 'search' in self.options:
65             await self.create_word_table()
66         await self.create_indexes()
67
68
69     async def create_tables(self) -> None:
70         """ Set up the database tables.
71         """
72         LOG.warning('Setting up tables')
73         if 'search' not in self.options:
74             self.dest.t.meta.remove(self.dest.t.search_name)
75         else:
76             await self.create_class_tables()
77
78         await self.dest.connection.run_sync(self.dest.t.meta.create_all)
79
80         # Convert all Geometry columns to Spatialite geometries
81         for table in self.dest.t.meta.sorted_tables:
82             for col in table.c:
83                 if isinstance(col.type, Geometry):
84                     await self.dest.execute(sa.select(
85                         sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
86                                                       col.type.subtype.upper(), 'XY')))
87
88
89     async def create_class_tables(self) -> None:
90         """ Set up the table that serve class/type-specific geometries.
91         """
92         sql = sa.text("""SELECT tablename FROM pg_tables
93                          WHERE tablename LIKE 'place_classtype_%'""")
94         for res in await self.src.execute(sql):
95             for db in (self.src, self.dest):
96                 sa.Table(res[0], db.t.meta,
97                          sa.Column('place_id', sa.BigInteger),
98                          sa.Column('centroid', Geometry))
99
100
101     async def create_word_table(self) -> None:
102         """ Create the word table.
103             This table needs the property information to determine the
104             correct format. Therefore needs to be done after all other
105             data has been copied.
106         """
107         await make_query_analyzer(self.src)
108         await make_query_analyzer(self.dest)
109         src = self.src.t.meta.tables['word']
110         dest = self.dest.t.meta.tables['word']
111
112         await self.dest.connection.run_sync(dest.create)
113
114         LOG.warning("Copying word table")
115         async_result = await self.src.connection.stream(sa.select(src))
116
117         async for partition in async_result.partitions(10000):
118             data = [{k: getattr(r, k) for k in r._fields} for r in partition]
119             await self.dest.execute(dest.insert(), data)
120
121         await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
122
123
124     async def copy_data(self) -> None:
125         """ Copy data for all registered tables.
126         """
127         for table in self.dest.t.meta.sorted_tables:
128             LOG.warning("Copying '%s'", table.name)
129             async_result = await self.src.connection.stream(self.select_from(table.name))
130
131             async for partition in async_result.partitions(10000):
132                 data = [{('class_' if k == 'class' else k): getattr(r, k) for k in r._fields}
133                         for r in partition]
134                 await self.dest.execute(table.insert(), data)
135
136         # Set up a minimal copy of pg_tables used to look up the class tables later.
137         pg_tables = sa.Table('pg_tables', self.dest.t.meta,
138                              sa.Column('schemaname', sa.Text, default='public'),
139                              sa.Column('tablename', sa.Text))
140         await self.dest.connection.run_sync(pg_tables.create)
141         data = [{'tablename': t} for t in self.dest.t.meta.tables]
142         await self.dest.execute(pg_tables.insert().values(data))
143
144
145     async def create_indexes(self) -> None:
146         """ Add indexes necessary for the frontend.
147         """
148         # reverse place node lookup needs an extra table to simulate a
149         # partial index with adaptive buffering.
150         await self.dest.execute(sa.text(
151             """ CREATE TABLE placex_place_node_areas AS
152                   SELECT place_id, ST_Expand(geometry,
153                                              14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
154                   FROM placex
155                   WHERE rank_address between 5 and 25
156                         and osm_type = 'N'
157                         and linked_place_id is NULL """))
158         await self.dest.execute(sa.select(
159             sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
160                                           4326, 'GEOMETRY', 'XY')))
161         await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
162                                              'placex_place_node_areas', 'geometry')))
163
164         # Remaining indexes.
165         await self.create_spatial_index('country_grid', 'geometry')
166         await self.create_spatial_index('placex', 'geometry')
167         await self.create_spatial_index('osmline', 'linegeo')
168         await self.create_spatial_index('tiger', 'linegeo')
169         await self.create_index('placex', 'place_id')
170         await self.create_index('placex', 'parent_place_id')
171         await self.create_index('placex', 'rank_address')
172         await self.create_index('addressline', 'place_id')
173         await self.create_index('postcode', 'place_id')
174         await self.create_index('osmline', 'place_id')
175         await self.create_index('tiger', 'place_id')
176
177         if 'search' in self.options:
178             await self.create_spatial_index('postcode', 'geometry')
179             await self.create_spatial_index('search_name', 'centroid')
180             await self.create_index('search_name', 'place_id')
181             await self.create_index('osmline', 'parent_place_id')
182             await self.create_index('tiger', 'parent_place_id')
183             await self.create_search_index()
184
185             for t in self.dest.t.meta.tables:
186                 if t.startswith('place_classtype_'):
187                     await self.dest.execute(sa.select(
188                       sa.func.CreateSpatialIndex(t, 'centroid')))
189
190
191     async def create_spatial_index(self, table: str, column: str) -> None:
192         """ Create a spatial index on the given table and column.
193         """
194         await self.dest.execute(sa.select(
195                   sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
196
197
198     async def create_index(self, table_name: str, column: str) -> None:
199         """ Create a simple index on the given table and column.
200         """
201         table = getattr(self.dest.t, table_name)
202         await self.dest.connection.run_sync(
203             sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
204
205
206     async def create_search_index(self) -> None:
207         """ Create the tables and indexes needed for word lookup.
208         """
209         LOG.warning("Creating reverse search table")
210         rsn = sa.Table('reverse_search_name', self.dest.t.meta,
211                        sa.Column('word', sa.Integer()),
212                        sa.Column('column', sa.Text()),
213                        sa.Column('places', IntArray))
214         await self.dest.connection.run_sync(rsn.create)
215
216         tsrc = self.src.t.search_name
217         for column in ('name_vector', 'nameaddress_vector'):
218             sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
219                             sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
220                     .group_by('word')
221
222             async_result = await self.src.connection.stream(sql)
223             async for partition in async_result.partitions(100):
224                 data = []
225                 for row in partition:
226                     row.places.sort()
227                     data.append({'word': row.word,
228                                  'column': column,
229                                  'places': row.places})
230                 await self.dest.execute(rsn.insert(), data)
231
232         await self.dest.connection.run_sync(
233             sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
234
235
236     def select_from(self, table: str) -> SaSelect:
237         """ Create the SQL statement to select the source columns and rows.
238         """
239         columns = self.src.t.meta.tables[table].c
240
241         if table == 'placex':
242             # SQLite struggles with Geometries that are larger than 5MB,
243             # so simplify those.
244             return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
245                              sa.func.ST_AsText(columns.centroid).label('centroid'),
246                              sa.func.ST_AsText(
247                                sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
248                                         columns.geometry),
249                                        else_=sa.func.ST_SimplifyPreserveTopology(
250                                                 columns.geometry, 0.0001)
251                                 )).label('geometry'))
252
253         sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
254                              if isinstance(c.type, Geometry) else c for c in columns))
255
256         return sql