]> git.openstreetmap.org Git - nominatim.git/commitdiff
add exporting of SQLite table
authorSarah Hoffmann <lonvia@denofr.de>
Thu, 12 Oct 2023 08:45:12 +0000 (10:45 +0200)
committerSarah Hoffmann <lonvia@denofr.de>
Mon, 23 Oct 2023 15:19:12 +0000 (17:19 +0200)
nominatim/api/core.py
nominatim/db/sqlalchemy_types.py
nominatim/tools/convert_sqlite.py

index a6b49404a02536b99a0eaf346b6ba824f98cc1bc..44ac91606fef90a746bb26d06b2a9fc6da0e61e4 100644 (file)
@@ -81,21 +81,34 @@ class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes
             if self._engine:
                 return
 
-            dsn = self.config.get_database_params()
-            pool_size = self.config.get_int('API_POOL_SIZE')
-
-            query = {k: v for k, v in dsn.items()
-                      if k not in ('user', 'password', 'dbname', 'host', 'port')}
-
-            dburl = sa.engine.URL.create(
-                       f'postgresql+{PGCORE_LIB}',
-                       database=dsn.get('dbname'),
-                       username=dsn.get('user'), password=dsn.get('password'),
-                       host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
-                       query=query)
-            engine = sa_asyncio.create_async_engine(dburl, future=True,
-                                                    max_overflow=0, pool_size=pool_size,
-                                                    echo=self.config.get_bool('DEBUG_SQL'))
+            extra_args: Dict[str, Any] = {'future': True,
+                                          'echo': self.config.get_bool('DEBUG_SQL')}
+
+            is_sqlite = self.config.DATABASE_DSN.startswith('sqlite:')
+
+            if is_sqlite:
+                params = dict((p.split('=', 1)
+                              for p in self.config.DATABASE_DSN[7:].split(';')))
+                dburl = sa.engine.URL.create('sqlite+aiosqlite',
+                                             database=params.get('dbname'))
+
+            else:
+                dsn = self.config.get_database_params()
+                query = {k: v for k, v in dsn.items()
+                         if k not in ('user', 'password', 'dbname', 'host', 'port')}
+
+                dburl = sa.engine.URL.create(
+                           f'postgresql+{PGCORE_LIB}',
+                           database=dsn.get('dbname'),
+                           username=dsn.get('user'),
+                           password=dsn.get('password'),
+                           host=dsn.get('host'),
+                           port=int(dsn['port']) if 'port' in dsn else None,
+                           query=query)
+                extra_args['max_overflow'] = 0
+                extra_args['pool_size'] = self.config.get_int('API_POOL_SIZE')
+
+            engine = sa_asyncio.create_async_engine(dburl, **extra_args)
 
             try:
                 async with engine.begin() as conn:
@@ -104,7 +117,7 @@ class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes
             except (PGCORE_ERROR, sa.exc.OperationalError):
                 server_version = 0
 
-            if server_version >= 110000:
+            if server_version >= 110000 and not is_sqlite:
                 @sa.event.listens_for(engine.sync_engine, "connect")
                 def _on_connect(dbapi_con: Any, _: Any) -> None:
                     cursor = dbapi_con.cursor()
@@ -113,6 +126,15 @@ class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes
                 # Make sure that all connections get the new settings
                 await self.close()
 
+            if is_sqlite:
+                @sa.event.listens_for(engine.sync_engine, "connect")
+                def _on_sqlite_connect(dbapi_con: Any, _: Any) -> None:
+                    dbapi_con.run_async(lambda conn: conn.enable_load_extension(True))
+                    cursor = dbapi_con.cursor()
+                    cursor.execute("SELECT load_extension('mod_spatialite')")
+                    cursor.execute('SELECT SetDecimalPrecision(7)')
+                    dbapi_con.run_async(lambda conn: conn.enable_load_extension(False))
+
             self._property_cache['DB:server_version'] = server_version
 
             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
index 7b9590363db2a055962e61ac2d850397cfc0e5d2..e9687dffa5af19aca840016af9a4e6dc7811a845 100644 (file)
@@ -28,7 +28,7 @@ class Geometry(types.UserDefinedType): # type: ignore[type-arg]
 
 
     def get_col_spec(self) -> str:
-        return f'GEOMETRY({self.subtype}, 4326)'
+        return f'GEOMETRY'
 
 
     def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:
index 42977ce8659e78cd34dff31789f2ded3f3225c09..afae0d6b87e7b545c94bdbe06a9c8e895f1d1010 100644 (file)
@@ -8,12 +8,17 @@
 Exporting a Nominatim database to SQlite.
 """
 from typing import Set
+import logging
 from pathlib import Path
 
 import sqlalchemy as sa
 
+from nominatim.typing import SaSelect
+from nominatim.db.sqlalchemy_types import Geometry
 import nominatim.api as napi
 
+LOG = logging.getLogger()
+
 async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
     """ Export an existing database to sqlite. The resulting database
         will be usable against the Python frontend of Nominatim.
@@ -24,7 +29,115 @@ async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
         outapi = napi.NominatimAPIAsync(project_dir,
                                         {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}"})
 
-        async with api.begin() as inconn, outapi.begin() as outconn:
-            pass
+        async with api.begin() as src, outapi.begin() as dest:
+            writer = SqliteWriter(src, dest, options)
+            await writer.write()
     finally:
         await api.close()
+
+
+class SqliteWriter:
+    """ Worker class which creates a new SQLite database.
+    """
+
+    def __init__(self, src: napi.SearchConnection,
+                 dest: napi.SearchConnection, options: Set[str]) -> None:
+        self.src = src
+        self.dest = dest
+        self.options = options
+
+
+    async def write(self) -> None:
+        """ Create the database structure and copy the data from
+            the source database to the destination.
+        """
+        await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
+
+        await self.create_tables()
+        await self.copy_data()
+        await self.create_indexes()
+
+
+    async def create_tables(self) -> None:
+        """ Set up the database tables.
+        """
+        if 'search' not in self.options:
+            self.dest.t.meta.remove(self.dest.t.search_name)
+
+        await self.dest.connection.run_sync(self.dest.t.meta.create_all)
+
+        # Convert all Geometry columns to Spatialite geometries
+        for table in self.dest.t.meta.sorted_tables:
+            for col in table.c:
+                if isinstance(col.type, Geometry):
+                    await self.dest.execute(sa.select(
+                        sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
+                                                      col.type.subtype.upper(), 'XY')))
+
+
+    async def copy_data(self) -> None:
+        """ Copy data for all registered tables.
+        """
+        for table in self.dest.t.meta.sorted_tables:
+            LOG.warning("Copying '%s'", table.name)
+            async_result = await self.src.connection.stream(self.select_from(table.name))
+
+            async for partition in async_result.partitions(10000):
+                data = [{('class_' if k == 'class' else k): getattr(r, k) for k in r._fields}
+                        for r in partition]
+                await self.dest.execute(table.insert(), data)
+
+
+    async def create_indexes(self) -> None:
+        """ Add indexes necessary for the frontend.
+        """
+        # reverse place node lookup needs an extra table to simulate a
+        # partial index with adaptive buffering.
+        await self.dest.execute(sa.text(
+            """ CREATE TABLE placex_place_node_areas AS
+                  SELECT place_id, ST_Expand(geometry,
+                                             14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
+                  FROM placex
+                  WHERE rank_address between 5 and 25
+                        and osm_type = 'N'
+                        and linked_place_id is NULL """))
+        await self.dest.execute(sa.select(
+            sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
+                                          4326, 'GEOMETRY', 'XY')))
+        await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
+                                             'placex_place_node_areas', 'geometry')))
+
+        # Remaining indexes.
+        await self.create_spatial_index('country_grid', 'geometry')
+        await self.create_spatial_index('placex', 'geometry')
+        await self.create_spatial_index('osmline', 'linegeo')
+        await self.create_spatial_index('tiger', 'linegeo')
+        await self.create_index('placex', 'place_id')
+        await self.create_index('placex', 'rank_address')
+        await self.create_index('addressline', 'place_id')
+
+
+    async def create_spatial_index(self, table: str, column: str) -> None:
+        """ Create a spatial index on the given table and column.
+        """
+        await self.dest.execute(sa.select(
+                  sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
+
+
+    async def create_index(self, table_name: str, column: str) -> None:
+        """ Create a simple index on the given table and column.
+        """
+        table = getattr(self.dest.t, table_name)
+        await self.dest.connection.run_sync(
+            sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
+
+
+    def select_from(self, table: str) -> SaSelect:
+        """ Create the SQL statement to select the source columns and rows.
+        """
+        columns = self.src.t.meta.tables[table].c
+
+        sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
+                             if isinstance(c.type, Geometry) else c for c in columns))
+
+        return sql