]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/convert_sqlite.py
correctly close API objects during testing
[nominatim.git] / nominatim / tools / convert_sqlite.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Exporting a Nominatim database to SQlite.
9 """
10 from typing import Set
11 import logging
12 from pathlib import Path
13
14 import sqlalchemy as sa
15
16 from nominatim.typing import SaSelect
17 from nominatim.db.sqlalchemy_types import Geometry
18 import nominatim.api as napi
19
20 LOG = logging.getLogger()
21
22 async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
23     """ Export an existing database to sqlite. The resulting database
24         will be usable against the Python frontend of Nominatim.
25     """
26     api = napi.NominatimAPIAsync(project_dir)
27
28     try:
29         outapi = napi.NominatimAPIAsync(project_dir,
30                                         {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}"})
31
32         try:
33             async with api.begin() as src, outapi.begin() as dest:
34                 writer = SqliteWriter(src, dest, options)
35                 await writer.write()
36         finally:
37             await outapi.close()
38     finally:
39         await api.close()
40
41
42 class SqliteWriter:
43     """ Worker class which creates a new SQLite database.
44     """
45
46     def __init__(self, src: napi.SearchConnection,
47                  dest: napi.SearchConnection, options: Set[str]) -> None:
48         self.src = src
49         self.dest = dest
50         self.options = options
51
52
53     async def write(self) -> None:
54         """ Create the database structure and copy the data from
55             the source database to the destination.
56         """
57         await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
58
59         await self.create_tables()
60         await self.copy_data()
61         await self.create_indexes()
62
63
64     async def create_tables(self) -> None:
65         """ Set up the database tables.
66         """
67         if 'search' not in self.options:
68             self.dest.t.meta.remove(self.dest.t.search_name)
69
70         await self.dest.connection.run_sync(self.dest.t.meta.create_all)
71
72         # Convert all Geometry columns to Spatialite geometries
73         for table in self.dest.t.meta.sorted_tables:
74             for col in table.c:
75                 if isinstance(col.type, Geometry):
76                     await self.dest.execute(sa.select(
77                         sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
78                                                       col.type.subtype.upper(), 'XY')))
79
80
81     async def copy_data(self) -> None:
82         """ Copy data for all registered tables.
83         """
84         for table in self.dest.t.meta.sorted_tables:
85             LOG.warning("Copying '%s'", table.name)
86             async_result = await self.src.connection.stream(self.select_from(table.name))
87
88             async for partition in async_result.partitions(10000):
89                 data = [{('class_' if k == 'class' else k): getattr(r, k) for k in r._fields}
90                         for r in partition]
91                 await self.dest.execute(table.insert(), data)
92
93
94     async def create_indexes(self) -> None:
95         """ Add indexes necessary for the frontend.
96         """
97         # reverse place node lookup needs an extra table to simulate a
98         # partial index with adaptive buffering.
99         await self.dest.execute(sa.text(
100             """ CREATE TABLE placex_place_node_areas AS
101                   SELECT place_id, ST_Expand(geometry,
102                                              14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
103                   FROM placex
104                   WHERE rank_address between 5 and 25
105                         and osm_type = 'N'
106                         and linked_place_id is NULL """))
107         await self.dest.execute(sa.select(
108             sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
109                                           4326, 'GEOMETRY', 'XY')))
110         await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
111                                              'placex_place_node_areas', 'geometry')))
112
113         # Remaining indexes.
114         await self.create_spatial_index('country_grid', 'geometry')
115         await self.create_spatial_index('placex', 'geometry')
116         await self.create_spatial_index('osmline', 'linegeo')
117         await self.create_spatial_index('tiger', 'linegeo')
118         await self.create_index('placex', 'place_id')
119         await self.create_index('placex', 'parent_place_id')
120         await self.create_index('placex', 'rank_address')
121         await self.create_index('addressline', 'place_id')
122
123
124     async def create_spatial_index(self, table: str, column: str) -> None:
125         """ Create a spatial index on the given table and column.
126         """
127         await self.dest.execute(sa.select(
128                   sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
129
130
131     async def create_index(self, table_name: str, column: str) -> None:
132         """ Create a simple index on the given table and column.
133         """
134         table = getattr(self.dest.t, table_name)
135         await self.dest.connection.run_sync(
136             sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
137
138
139     def select_from(self, table: str) -> SaSelect:
140         """ Create the SQL statement to select the source columns and rows.
141         """
142         columns = self.src.t.meta.tables[table].c
143
144         if table == 'placex':
145             # SQLite struggles with Geometries that are larger than 5MB,
146             # so simplify those.
147             return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
148                              sa.func.ST_AsText(columns.centroid).label('centroid'),
149                              sa.func.ST_AsText(
150                                sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
151                                         columns.geometry),
152                                        else_=sa.func.ST_SimplifyPreserveTopology(
153                                                 columns.geometry, 0.0001)
154                                 )).label('geometry'))
155
156         sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
157                              if isinstance(c.type, Geometry) else c for c in columns))
158
159         return sql