+#pylint: disable=singleton-comparison,not-callable
+#pylint: disable=too-many-branches,too-many-arguments,too-many-locals,too-many-statements
+
+def _select_placex(t: SaFromClause) -> SaSelect:
+ return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
+ t.c.class_, t.c.type,
+ t.c.address, t.c.extratags,
+ t.c.housenumber, t.c.postcode, t.c.country_code,
+ t.c.importance, t.c.wikipedia,
+ t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
+ t.c.centroid,
+ t.c.geometry.ST_Expand(0).label('bbox'))
+
+
+def _add_geometry_columns(sql: SaSelect, col: SaColumn, details: SearchDetails) -> SaSelect:
+ if not details.geometry_output:
+ return sql
+
+ out = []
+
+ if details.geometry_simplification > 0.0:
+ col = col.ST_SimplifyPreserveTopology(details.geometry_simplification)
+
+ if details.geometry_output & GeometryFormat.GEOJSON:
+ out.append(col.ST_AsGeoJSON().label('geometry_geojson'))
+ if details.geometry_output & GeometryFormat.TEXT:
+ out.append(col.ST_AsText().label('geometry_text'))
+ if details.geometry_output & GeometryFormat.KML:
+ out.append(col.ST_AsKML().label('geometry_kml'))
+ if details.geometry_output & GeometryFormat.SVG:
+ out.append(col.ST_AsSVG().label('geometry_svg'))
+
+ return sql.add_columns(*out)
+
+
+def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause,
+ numerals: List[int], details: SearchDetails) -> SaScalarSelect:
+ all_ids = array_agg(table.c.place_id) # type: ignore[no-untyped-call]
+ sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id)
+
+ if len(numerals) == 1:
+ sql = sql.where(sa.between(numerals[0], table.c.startnumber, table.c.endnumber))\
+ .where((numerals[0] - table.c.startnumber) % table.c.step == 0)
+ else:
+ sql = sql.where(sa.or_(
+ *(sa.and_(sa.between(n, table.c.startnumber, table.c.endnumber),
+ (n - table.c.startnumber) % table.c.step == 0)
+ for n in numerals)))
+
+ if details.excluded:
+ sql = sql.where(table.c.place_id.not_in(details.excluded))
+
+ return sql.scalar_subquery()
+
+
+def _filter_by_layer(table: SaFromClause, layers: DataLayer) -> SaColumn:
+ orexpr: List[SaExpression] = []
+ if layers & DataLayer.ADDRESS and layers & DataLayer.POI:
+ orexpr.append(table.c.rank_address.between(1, 30))
+ elif layers & DataLayer.ADDRESS:
+ orexpr.append(table.c.rank_address.between(1, 29))
+ orexpr.append(sa.and_(table.c.rank_address == 30,
+ sa.or_(table.c.housenumber != None,
+ table.c.address.has_key('housename'))))
+ elif layers & DataLayer.POI:
+ orexpr.append(sa.and_(table.c.rank_address == 30,
+ table.c.class_.not_in(('place', 'building'))))
+
+ if layers & DataLayer.MANMADE:
+ exclude = []
+ if not layers & DataLayer.RAILWAY:
+ exclude.append('railway')
+ if not layers & DataLayer.NATURAL:
+ exclude.extend(('natural', 'water', 'waterway'))
+ orexpr.append(sa.and_(table.c.class_.not_in(tuple(exclude)),
+ table.c.rank_address == 0))
+ else:
+ include = []
+ if layers & DataLayer.RAILWAY:
+ include.append('railway')
+ if layers & DataLayer.NATURAL:
+ include.extend(('natural', 'water', 'waterway'))
+ orexpr.append(sa.and_(table.c.class_.in_(tuple(include)),
+ table.c.rank_address == 0))
+
+ if len(orexpr) == 1:
+ return orexpr[0]
+
+ return sa.or_(*orexpr)
+
+
+def _interpolated_position(table: SaFromClause, nr: SaColumn) -> SaColumn:
+ pos = sa.cast(nr - table.c.startnumber, sa.Float) / (table.c.endnumber - table.c.startnumber)
+ return sa.case(
+ (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
+ else_=table.c.linegeo.ST_LineInterpolatePoint(pos)).label('centroid')
+
+
+async def _get_placex_housenumbers(conn: SearchConnection,
+ place_ids: List[int],
+ details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
+ t = conn.t.placex
+ sql = _select_placex(t).where(t.c.place_id.in_(place_ids))
+
+ sql = _add_geometry_columns(sql, t.c.geometry, details)
+
+ for row in await conn.execute(sql):
+ result = nres.create_from_placex_row(row, nres.SearchResult)
+ assert result
+ result.bbox = Bbox.from_wkb(row.bbox.data)
+ yield result
+
+
+async def _get_osmline(conn: SearchConnection, place_ids: List[int],
+ numerals: List[int],
+ details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
+ t = conn.t.osmline
+ values = sa.values(sa.Column('nr', sa.Integer()), name='housenumber')\
+ .data([(n,) for n in numerals])
+ sql = sa.select(t.c.place_id, t.c.osm_id,
+ t.c.parent_place_id, t.c.address,
+ values.c.nr.label('housenumber'),
+ _interpolated_position(t, values.c.nr),
+ t.c.postcode, t.c.country_code)\
+ .where(t.c.place_id.in_(place_ids))\
+ .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
+
+ if details.geometry_output:
+ sub = sql.subquery()
+ sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details)
+
+ for row in await conn.execute(sql):
+ result = nres.create_from_osmline_row(row, nres.SearchResult)
+ assert result
+ yield result
+
+
+async def _get_tiger(conn: SearchConnection, place_ids: List[int],
+ numerals: List[int], osm_id: int,
+ details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
+ t = conn.t.tiger
+ values = sa.values(sa.Column('nr', sa.Integer()), name='housenumber')\
+ .data([(n,) for n in numerals])
+ sql = sa.select(t.c.place_id, t.c.parent_place_id,
+ sa.literal('W').label('osm_type'),
+ sa.literal(osm_id).label('osm_id'),
+ values.c.nr.label('housenumber'),
+ _interpolated_position(t, values.c.nr),
+ t.c.postcode)\
+ .where(t.c.place_id.in_(place_ids))\
+ .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
+
+ if details.geometry_output:
+ sub = sql.subquery()
+ sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details)
+
+ for row in await conn.execute(sql):
+ result = nres.create_from_tiger_row(row, nres.SearchResult)
+ assert result
+ yield result
+
+