]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/convert_sqlite.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / nominatim / tools / convert_sqlite.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Exporting a Nominatim database to SQlite.
9 """
10 from typing import Set
11 import logging
12 from pathlib import Path
13
14 import sqlalchemy as sa
15
16 from nominatim.typing import SaSelect
17 from nominatim.db.sqlalchemy_types import Geometry
18 import nominatim.api as napi
19
20 LOG = logging.getLogger()
21
22 async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
23     """ Export an existing database to sqlite. The resulting database
24         will be usable against the Python frontend of Nominatim.
25     """
26     api = napi.NominatimAPIAsync(project_dir)
27
28     try:
29         outapi = napi.NominatimAPIAsync(project_dir,
30                                         {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}"})
31
32         async with api.begin() as src, outapi.begin() as dest:
33             writer = SqliteWriter(src, dest, options)
34             await writer.write()
35     finally:
36         await api.close()
37
38
39 class SqliteWriter:
40     """ Worker class which creates a new SQLite database.
41     """
42
43     def __init__(self, src: napi.SearchConnection,
44                  dest: napi.SearchConnection, options: Set[str]) -> None:
45         self.src = src
46         self.dest = dest
47         self.options = options
48
49
50     async def write(self) -> None:
51         """ Create the database structure and copy the data from
52             the source database to the destination.
53         """
54         await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
55
56         await self.create_tables()
57         await self.copy_data()
58         await self.create_indexes()
59
60
61     async def create_tables(self) -> None:
62         """ Set up the database tables.
63         """
64         if 'search' not in self.options:
65             self.dest.t.meta.remove(self.dest.t.search_name)
66
67         await self.dest.connection.run_sync(self.dest.t.meta.create_all)
68
69         # Convert all Geometry columns to Spatialite geometries
70         for table in self.dest.t.meta.sorted_tables:
71             for col in table.c:
72                 if isinstance(col.type, Geometry):
73                     await self.dest.execute(sa.select(
74                         sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
75                                                       col.type.subtype.upper(), 'XY')))
76
77
78     async def copy_data(self) -> None:
79         """ Copy data for all registered tables.
80         """
81         for table in self.dest.t.meta.sorted_tables:
82             LOG.warning("Copying '%s'", table.name)
83             async_result = await self.src.connection.stream(self.select_from(table.name))
84
85             async for partition in async_result.partitions(10000):
86                 data = [{('class_' if k == 'class' else k): getattr(r, k) for k in r._fields}
87                         for r in partition]
88                 await self.dest.execute(table.insert(), data)
89
90
91     async def create_indexes(self) -> None:
92         """ Add indexes necessary for the frontend.
93         """
94         # reverse place node lookup needs an extra table to simulate a
95         # partial index with adaptive buffering.
96         await self.dest.execute(sa.text(
97             """ CREATE TABLE placex_place_node_areas AS
98                   SELECT place_id, ST_Expand(geometry,
99                                              14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
100                   FROM placex
101                   WHERE rank_address between 5 and 25
102                         and osm_type = 'N'
103                         and linked_place_id is NULL """))
104         await self.dest.execute(sa.select(
105             sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
106                                           4326, 'GEOMETRY', 'XY')))
107         await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
108                                              'placex_place_node_areas', 'geometry')))
109
110         # Remaining indexes.
111         await self.create_spatial_index('country_grid', 'geometry')
112         await self.create_spatial_index('placex', 'geometry')
113         await self.create_spatial_index('osmline', 'linegeo')
114         await self.create_spatial_index('tiger', 'linegeo')
115         await self.create_index('placex', 'place_id')
116         await self.create_index('placex', 'parent_place_id')
117         await self.create_index('placex', 'rank_address')
118         await self.create_index('addressline', 'place_id')
119
120
121     async def create_spatial_index(self, table: str, column: str) -> None:
122         """ Create a spatial index on the given table and column.
123         """
124         await self.dest.execute(sa.select(
125                   sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
126
127
128     async def create_index(self, table_name: str, column: str) -> None:
129         """ Create a simple index on the given table and column.
130         """
131         table = getattr(self.dest.t, table_name)
132         await self.dest.connection.run_sync(
133             sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
134
135
136     def select_from(self, table: str) -> SaSelect:
137         """ Create the SQL statement to select the source columns and rows.
138         """
139         columns = self.src.t.meta.tables[table].c
140
141         if table == 'placex':
142             # SQLite struggles with Geometries that are larger than 5MB,
143             # so simplify those.
144             return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
145                              sa.func.ST_AsText(columns.centroid).label('centroid'),
146                              sa.func.ST_AsText(
147                                sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
148                                         columns.geometry),
149                                        else_=sa.func.ST_SimplifyPreserveTopology(
150                                                 columns.geometry, 0.0001)
151                                 )).label('geometry'))
152
153         sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
154                              if isinstance(c.type, Geometry) else c for c in columns))
155
156         return sql