import sqlalchemy as sa
from nominatim.typing import SaSelect
-from nominatim.db.sqlalchemy_types import Geometry
+from nominatim.db.sqlalchemy_types import Geometry, IntArray
+from nominatim.api.search.query_analyzer_factory import make_query_analyzer
import nominatim.api as napi
LOG = logging.getLogger()
outapi = napi.NominatimAPIAsync(project_dir,
{'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}"})
- async with api.begin() as src, outapi.begin() as dest:
- writer = SqliteWriter(src, dest, options)
- await writer.write()
+ try:
+ async with api.begin() as src, outapi.begin() as dest:
+ writer = SqliteWriter(src, dest, options)
+ await writer.write()
+ finally:
+ await outapi.close()
finally:
await api.close()
""" Create the database structure and copy the data from
the source database to the destination.
"""
+ LOG.warning('Setting up spatialite')
await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
await self.create_tables()
await self.copy_data()
+ if 'search' in self.options:
+ await self.create_word_table()
await self.create_indexes()
async def create_tables(self) -> None:
""" Set up the database tables.
"""
+ LOG.warning('Setting up tables')
if 'search' not in self.options:
self.dest.t.meta.remove(self.dest.t.search_name)
+ else:
+ await self.create_class_tables()
await self.dest.connection.run_sync(self.dest.t.meta.create_all)
col.type.subtype.upper(), 'XY')))
+ async def create_class_tables(self) -> None:
+ """ Set up the table that serve class/type-specific geometries.
+ """
+ sql = sa.text("""SELECT tablename FROM pg_tables
+ WHERE tablename LIKE 'place_classtype_%'""")
+ for res in await self.src.execute(sql):
+ for db in (self.src, self.dest):
+ sa.Table(res[0], db.t.meta,
+ sa.Column('place_id', sa.BigInteger),
+ sa.Column('centroid', Geometry))
+
+
+ async def create_word_table(self) -> None:
+ """ Create the word table.
+ This table needs the property information to determine the
+ correct format. Therefore needs to be done after all other
+ data has been copied.
+ """
+ await make_query_analyzer(self.src)
+ await make_query_analyzer(self.dest)
+ src = self.src.t.meta.tables['word']
+ dest = self.dest.t.meta.tables['word']
+
+ await self.dest.connection.run_sync(dest.create)
+
+ LOG.warning("Copying word table")
+ async_result = await self.src.connection.stream(sa.select(src))
+
+ async for partition in async_result.partitions(10000):
+ data = [{k: getattr(r, k) for k in r._fields} for r in partition]
+ await self.dest.execute(dest.insert(), data)
+
+ await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
+
+
async def copy_data(self) -> None:
""" Copy data for all registered tables.
"""
for r in partition]
await self.dest.execute(table.insert(), data)
+ # Set up a minimal copy of pg_tables used to look up the class tables later.
+ pg_tables = sa.Table('pg_tables', self.dest.t.meta,
+ sa.Column('schemaname', sa.Text, default='public'),
+ sa.Column('tablename', sa.Text))
+ await self.dest.connection.run_sync(pg_tables.create)
+ data = [{'tablename': t} for t in self.dest.t.meta.tables]
+ await self.dest.execute(pg_tables.insert().values(data))
+
async def create_indexes(self) -> None:
""" Add indexes necessary for the frontend.
await self.create_index('placex', 'parent_place_id')
await self.create_index('placex', 'rank_address')
await self.create_index('addressline', 'place_id')
+ await self.create_index('postcode', 'place_id')
+ await self.create_index('osmline', 'place_id')
+ await self.create_index('tiger', 'place_id')
+
+ if 'search' in self.options:
+ await self.create_spatial_index('postcode', 'geometry')
+ await self.create_spatial_index('search_name', 'centroid')
+ await self.create_index('search_name', 'place_id')
+ await self.create_index('osmline', 'parent_place_id')
+ await self.create_index('tiger', 'parent_place_id')
+ await self.create_search_index()
+
+ for t in self.dest.t.meta.tables:
+ if t.startswith('place_classtype_'):
+ await self.dest.execute(sa.select(
+ sa.func.CreateSpatialIndex(t, 'centroid')))
async def create_spatial_index(self, table: str, column: str) -> None:
sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
+ async def create_search_index(self) -> None:
+ """ Create the tables and indexes needed for word lookup.
+ """
+ tsrc = self.src.t.search_name
+ for column in ('name_vector', 'nameaddress_vector'):
+ table_name = f'reverse_search_{column}'
+ LOG.warning("Creating reverse search %s", table_name)
+ rsn = sa.Table(table_name, self.dest.t.meta,
+ sa.Column('word', sa.Integer()),
+ sa.Column('places', IntArray))
+ await self.dest.connection.run_sync(rsn.create)
+
+ sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
+ sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
+ .group_by('word')
+
+ async_result = await self.src.connection.stream(sql)
+ async for partition in async_result.partitions(100):
+ data = []
+ for row in partition:
+ row.places.sort()
+ data.append({'word': row.word,
+ 'places': row.places})
+ await self.dest.execute(rsn.insert(), data)
+
+ await self.dest.connection.run_sync(
+ sa.Index(f'idx_reverse_search_{column}_word', rsn.c.word).create)
+
+
def select_from(self, table: str) -> SaSelect:
""" Create the SQL statement to select the source columns and rows.
"""