async def convert(project_dir: Optional[Union[str, Path]],
outfile: Path, options: Set[str]) -> None:
""" Export an existing database to sqlite. The resulting database
async def convert(project_dir: Optional[Union[str, Path]],
outfile: Path, options: Set[str]) -> None:
""" Export an existing database to sqlite. The resulting database
async def write(self) -> None:
""" Create the database structure and copy the data from
the source database to the destination.
async def write(self) -> None:
""" Create the database structure and copy the data from
the source database to the destination.
sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
col.type.subtype.upper(), 'XY')))
sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
col.type.subtype.upper(), 'XY')))
async def create_class_tables(self) -> None:
""" Set up the table that serve class/type-specific geometries.
"""
async def create_class_tables(self) -> None:
""" Set up the table that serve class/type-specific geometries.
"""
sa.Column('place_id', sa.BigInteger),
sa.Column('centroid', Geometry))
sa.Column('place_id', sa.BigInteger),
sa.Column('centroid', Geometry))
async def create_word_table(self) -> None:
""" Create the word table.
This table needs the property information to determine the
async def create_word_table(self) -> None:
""" Create the word table.
This table needs the property information to determine the
data = [{'tablename': t} for t in self.dest.t.meta.tables]
await self.dest.execute(pg_tables.insert().values(data))
data = [{'tablename': t} for t in self.dest.t.meta.tables]
await self.dest.execute(pg_tables.insert().values(data))
await self.dest.execute(sa.select(
sa.func.CreateSpatialIndex(t, 'centroid')))
await self.dest.execute(sa.select(
sa.func.CreateSpatialIndex(t, 'centroid')))
async def create_spatial_index(self, table: str, column: str) -> None:
""" Create a spatial index on the given table and column.
"""
await self.dest.execute(sa.select(
sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
async def create_spatial_index(self, table: str, column: str) -> None:
""" Create a spatial index on the given table and column.
"""
await self.dest.execute(sa.select(
sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
async def create_index(self, table_name: str, column: str) -> None:
""" Create a simple index on the given table and column.
"""
async def create_index(self, table_name: str, column: str) -> None:
""" Create a simple index on the given table and column.
"""
await self.dest.connection.run_sync(
sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
await self.dest.connection.run_sync(
sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
async def create_search_index(self) -> None:
""" Create the tables and indexes needed for word lookup.
"""
async def create_search_index(self) -> None:
""" Create the tables and indexes needed for word lookup.
"""
await self.dest.connection.run_sync(
sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
await self.dest.connection.run_sync(
sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
def select_from(self, table: str) -> SaSelect:
""" Create the SQL statement to select the source columns and rows.
"""
def select_from(self, table: str) -> SaSelect:
""" Create the SQL statement to select the source columns and rows.
"""
columns.geometry),
else_=sa.func.ST_SimplifyPreserveTopology(
columns.geometry, 0.0001)
columns.geometry),
else_=sa.func.ST_SimplifyPreserveTopology(
columns.geometry, 0.0001)