From 0d840c8d4ee6ea233ed32350e1d633402c80e46a Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Wed, 6 Dec 2023 13:42:58 +0100 Subject: [PATCH] extend sqlite converter for search tables --- nominatim/db/sqlalchemy_types/int_array.py | 14 ++++ nominatim/tools/convert_sqlite.py | 97 +++++++++++++++++++++- 2 files changed, 110 insertions(+), 1 deletion(-) diff --git a/nominatim/db/sqlalchemy_types/int_array.py b/nominatim/db/sqlalchemy_types/int_array.py index 335d5541..499376cb 100644 --- a/nominatim/db/sqlalchemy_types/int_array.py +++ b/nominatim/db/sqlalchemy_types/int_array.py @@ -10,6 +10,7 @@ Custom type for an array of integers. from typing import Any, List, cast, Optional import sqlalchemy as sa +from sqlalchemy.ext.compiler import compiles from sqlalchemy.dialects.postgresql import ARRAY from nominatim.typing import SaDialect, SaColumn @@ -71,3 +72,16 @@ class IntArray(sa.types.TypeDecorator[Any]): in the array. """ return self.op('&&', is_comparison=True)(other) + + +class ArrayAgg(sa.sql.functions.GenericFunction[Any]): + """ Aggregate function to collect elements in an array. + """ + type = IntArray() + identifier = 'ArrayAgg' + name = 'array_agg' + inherit_cache = True + +@compiles(ArrayAgg, 'sqlite') # type: ignore[no-untyped-call, misc] +def sqlite_array_agg(element: ArrayAgg, compiler: 'sa.Compiled', **kw: Any) -> str: + return "group_concat(%s, ',')" % compiler.process(element.clauses, **kw) diff --git a/nominatim/tools/convert_sqlite.py b/nominatim/tools/convert_sqlite.py index 16f51b66..d9e39ba3 100644 --- a/nominatim/tools/convert_sqlite.py +++ b/nominatim/tools/convert_sqlite.py @@ -14,7 +14,8 @@ from pathlib import Path import sqlalchemy as sa from nominatim.typing import SaSelect -from nominatim.db.sqlalchemy_types import Geometry +from nominatim.db.sqlalchemy_types import Geometry, IntArray +from nominatim.api.search.query_analyzer_factory import make_query_analyzer import nominatim.api as napi LOG = logging.getLogger() @@ -54,18 +55,24 @@ class SqliteWriter: """ Create the database structure and copy the data from the source database to the destination. """ + LOG.warning('Setting up spatialite') await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84'))) await self.create_tables() await self.copy_data() + if 'search' in self.options: + await self.create_word_table() await self.create_indexes() async def create_tables(self) -> None: """ Set up the database tables. """ + LOG.warning('Setting up tables') if 'search' not in self.options: self.dest.t.meta.remove(self.dest.t.search_name) + else: + await self.create_class_tables() await self.dest.connection.run_sync(self.dest.t.meta.create_all) @@ -78,6 +85,41 @@ class SqliteWriter: col.type.subtype.upper(), 'XY'))) + async def create_class_tables(self) -> None: + """ Set up the table that serve class/type-specific geometries. + """ + sql = sa.text("""SELECT tablename FROM pg_tables + WHERE tablename LIKE 'place_classtype_%'""") + for res in await self.src.execute(sql): + for db in (self.src, self.dest): + sa.Table(res[0], db.t.meta, + sa.Column('place_id', sa.BigInteger), + sa.Column('centroid', Geometry)) + + + async def create_word_table(self) -> None: + """ Create the word table. + This table needs the property information to determine the + correct format. Therefore needs to be done after all other + data has been copied. + """ + await make_query_analyzer(self.src) + await make_query_analyzer(self.dest) + src = self.src.t.meta.tables['word'] + dest = self.dest.t.meta.tables['word'] + + await self.dest.connection.run_sync(dest.create) + + LOG.warning("Copying word table") + async_result = await self.src.connection.stream(sa.select(src)) + + async for partition in async_result.partitions(10000): + data = [{k: getattr(r, k) for k in r._fields} for r in partition] + await self.dest.execute(dest.insert(), data) + + await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create) + + async def copy_data(self) -> None: """ Copy data for all registered tables. """ @@ -90,6 +132,14 @@ class SqliteWriter: for r in partition] await self.dest.execute(table.insert(), data) + # Set up a minimal copy of pg_tables used to look up the class tables later. + pg_tables = sa.Table('pg_tables', self.dest.t.meta, + sa.Column('schemaname', sa.Text, default='public'), + sa.Column('tablename', sa.Text)) + await self.dest.connection.run_sync(pg_tables.create) + data = [{'tablename': t} for t in self.dest.t.meta.tables] + await self.dest.execute(pg_tables.insert().values(data)) + async def create_indexes(self) -> None: """ Add indexes necessary for the frontend. @@ -119,6 +169,22 @@ class SqliteWriter: await self.create_index('placex', 'parent_place_id') await self.create_index('placex', 'rank_address') await self.create_index('addressline', 'place_id') + await self.create_index('postcode', 'place_id') + await self.create_index('osmline', 'place_id') + await self.create_index('tiger', 'place_id') + + if 'search' in self.options: + await self.create_spatial_index('postcode', 'geometry') + await self.create_spatial_index('search_name', 'centroid') + await self.create_index('search_name', 'place_id') + await self.create_index('osmline', 'parent_place_id') + await self.create_index('tiger', 'parent_place_id') + await self.create_search_index() + + for t in self.dest.t.meta.tables: + if t.startswith('place_classtype_'): + await self.dest.execute(sa.select( + sa.func.CreateSpatialIndex(t, 'centroid'))) async def create_spatial_index(self, table: str, column: str) -> None: @@ -136,6 +202,35 @@ class SqliteWriter: sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create) + async def create_search_index(self) -> None: + """ Create the tables and indexes needed for word lookup. + """ + tsrc = self.src.t.search_name + for column in ('name_vector', 'nameaddress_vector'): + table_name = f'reverse_search_{column}' + LOG.warning("Creating reverse search %s", table_name) + rsn = sa.Table(table_name, self.dest.t.meta, + sa.Column('word', sa.Integer()), + sa.Column('places', IntArray)) + await self.dest.connection.run_sync(rsn.create) + + sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'), + sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\ + .group_by('word') + + async_result = await self.src.connection.stream(sql) + async for partition in async_result.partitions(100): + data = [] + for row in partition: + row.places.sort() + data.append({'word': row.word, + 'places': row.places}) + await self.dest.execute(rsn.insert(), data) + + await self.dest.connection.run_sync( + sa.Index(f'idx_reverse_search_{column}_word', rsn.c.word).create) + + def select_from(self, table: str) -> SaSelect: """ Create the SQL statement to select the source columns and rows. """ -- 2.39.5