From 114cdafe7ef6b9a781ad143c208ebc57355b64d2 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Thu, 12 Oct 2023 10:45:12 +0200 Subject: [PATCH] add exporting of SQLite table --- nominatim/api/core.py | 54 ++++++++++---- nominatim/db/sqlalchemy_types.py | 2 +- nominatim/tools/convert_sqlite.py | 117 +++++++++++++++++++++++++++++- 3 files changed, 154 insertions(+), 19 deletions(-) diff --git a/nominatim/api/core.py b/nominatim/api/core.py index a6b49404..44ac9160 100644 --- a/nominatim/api/core.py +++ b/nominatim/api/core.py @@ -81,21 +81,34 @@ class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes if self._engine: return - dsn = self.config.get_database_params() - pool_size = self.config.get_int('API_POOL_SIZE') - - query = {k: v for k, v in dsn.items() - if k not in ('user', 'password', 'dbname', 'host', 'port')} - - dburl = sa.engine.URL.create( - f'postgresql+{PGCORE_LIB}', - database=dsn.get('dbname'), - username=dsn.get('user'), password=dsn.get('password'), - host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None, - query=query) - engine = sa_asyncio.create_async_engine(dburl, future=True, - max_overflow=0, pool_size=pool_size, - echo=self.config.get_bool('DEBUG_SQL')) + extra_args: Dict[str, Any] = {'future': True, + 'echo': self.config.get_bool('DEBUG_SQL')} + + is_sqlite = self.config.DATABASE_DSN.startswith('sqlite:') + + if is_sqlite: + params = dict((p.split('=', 1) + for p in self.config.DATABASE_DSN[7:].split(';'))) + dburl = sa.engine.URL.create('sqlite+aiosqlite', + database=params.get('dbname')) + + else: + dsn = self.config.get_database_params() + query = {k: v for k, v in dsn.items() + if k not in ('user', 'password', 'dbname', 'host', 'port')} + + dburl = sa.engine.URL.create( + f'postgresql+{PGCORE_LIB}', + database=dsn.get('dbname'), + username=dsn.get('user'), + password=dsn.get('password'), + host=dsn.get('host'), + port=int(dsn['port']) if 'port' in dsn else None, + query=query) + extra_args['max_overflow'] = 0 + extra_args['pool_size'] = self.config.get_int('API_POOL_SIZE') + + engine = sa_asyncio.create_async_engine(dburl, **extra_args) try: async with engine.begin() as conn: @@ -104,7 +117,7 @@ class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes except (PGCORE_ERROR, sa.exc.OperationalError): server_version = 0 - if server_version >= 110000: + if server_version >= 110000 and not is_sqlite: @sa.event.listens_for(engine.sync_engine, "connect") def _on_connect(dbapi_con: Any, _: Any) -> None: cursor = dbapi_con.cursor() @@ -113,6 +126,15 @@ class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes # Make sure that all connections get the new settings await self.close() + if is_sqlite: + @sa.event.listens_for(engine.sync_engine, "connect") + def _on_sqlite_connect(dbapi_con: Any, _: Any) -> None: + dbapi_con.run_async(lambda conn: conn.enable_load_extension(True)) + cursor = dbapi_con.cursor() + cursor.execute("SELECT load_extension('mod_spatialite')") + cursor.execute('SELECT SetDecimalPrecision(7)') + dbapi_con.run_async(lambda conn: conn.enable_load_extension(False)) + self._property_cache['DB:server_version'] = server_version self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member diff --git a/nominatim/db/sqlalchemy_types.py b/nominatim/db/sqlalchemy_types.py index 7b959036..e9687dff 100644 --- a/nominatim/db/sqlalchemy_types.py +++ b/nominatim/db/sqlalchemy_types.py @@ -28,7 +28,7 @@ class Geometry(types.UserDefinedType): # type: ignore[type-arg] def get_col_spec(self) -> str: - return f'GEOMETRY({self.subtype}, 4326)' + return f'GEOMETRY' def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]: diff --git a/nominatim/tools/convert_sqlite.py b/nominatim/tools/convert_sqlite.py index 42977ce8..afae0d6b 100644 --- a/nominatim/tools/convert_sqlite.py +++ b/nominatim/tools/convert_sqlite.py @@ -8,12 +8,17 @@ Exporting a Nominatim database to SQlite. """ from typing import Set +import logging from pathlib import Path import sqlalchemy as sa +from nominatim.typing import SaSelect +from nominatim.db.sqlalchemy_types import Geometry import nominatim.api as napi +LOG = logging.getLogger() + async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None: """ Export an existing database to sqlite. The resulting database will be usable against the Python frontend of Nominatim. @@ -24,7 +29,115 @@ async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None: outapi = napi.NominatimAPIAsync(project_dir, {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}"}) - async with api.begin() as inconn, outapi.begin() as outconn: - pass + async with api.begin() as src, outapi.begin() as dest: + writer = SqliteWriter(src, dest, options) + await writer.write() finally: await api.close() + + +class SqliteWriter: + """ Worker class which creates a new SQLite database. + """ + + def __init__(self, src: napi.SearchConnection, + dest: napi.SearchConnection, options: Set[str]) -> None: + self.src = src + self.dest = dest + self.options = options + + + async def write(self) -> None: + """ Create the database structure and copy the data from + the source database to the destination. + """ + await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84'))) + + await self.create_tables() + await self.copy_data() + await self.create_indexes() + + + async def create_tables(self) -> None: + """ Set up the database tables. + """ + if 'search' not in self.options: + self.dest.t.meta.remove(self.dest.t.search_name) + + await self.dest.connection.run_sync(self.dest.t.meta.create_all) + + # Convert all Geometry columns to Spatialite geometries + for table in self.dest.t.meta.sorted_tables: + for col in table.c: + if isinstance(col.type, Geometry): + await self.dest.execute(sa.select( + sa.func.RecoverGeometryColumn(table.name, col.name, 4326, + col.type.subtype.upper(), 'XY'))) + + + async def copy_data(self) -> None: + """ Copy data for all registered tables. + """ + for table in self.dest.t.meta.sorted_tables: + LOG.warning("Copying '%s'", table.name) + async_result = await self.src.connection.stream(self.select_from(table.name)) + + async for partition in async_result.partitions(10000): + data = [{('class_' if k == 'class' else k): getattr(r, k) for k in r._fields} + for r in partition] + await self.dest.execute(table.insert(), data) + + + async def create_indexes(self) -> None: + """ Add indexes necessary for the frontend. + """ + # reverse place node lookup needs an extra table to simulate a + # partial index with adaptive buffering. + await self.dest.execute(sa.text( + """ CREATE TABLE placex_place_node_areas AS + SELECT place_id, ST_Expand(geometry, + 14.0 * exp(-0.2 * rank_search) - 0.03) as geometry + FROM placex + WHERE rank_address between 5 and 25 + and osm_type = 'N' + and linked_place_id is NULL """)) + await self.dest.execute(sa.select( + sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry', + 4326, 'GEOMETRY', 'XY'))) + await self.dest.execute(sa.select(sa.func.CreateSpatialIndex( + 'placex_place_node_areas', 'geometry'))) + + # Remaining indexes. + await self.create_spatial_index('country_grid', 'geometry') + await self.create_spatial_index('placex', 'geometry') + await self.create_spatial_index('osmline', 'linegeo') + await self.create_spatial_index('tiger', 'linegeo') + await self.create_index('placex', 'place_id') + await self.create_index('placex', 'rank_address') + await self.create_index('addressline', 'place_id') + + + async def create_spatial_index(self, table: str, column: str) -> None: + """ Create a spatial index on the given table and column. + """ + await self.dest.execute(sa.select( + sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column))) + + + async def create_index(self, table_name: str, column: str) -> None: + """ Create a simple index on the given table and column. + """ + table = getattr(self.dest.t, table_name) + await self.dest.connection.run_sync( + sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create) + + + def select_from(self, table: str) -> SaSelect: + """ Create the SQL statement to select the source columns and rows. + """ + columns = self.src.t.meta.tables[table].c + + sql = sa.select(*(sa.func.ST_AsText(c).label(c.name) + if isinstance(c.type, Geometry) else c for c in columns)) + + return sql -- 2.39.5