+from nominatim.db.sqlalchemy_types import Geometry, IntArray
+
+#pylint: disable=singleton-comparison,not-callable
+#pylint: disable=too-many-branches,too-many-arguments,too-many-locals,too-many-statements
+
+def no_index(expr: SaColumn) -> SaColumn:
+ """ Wrap the given expression, so that the query planner will
+ refrain from using the expression for index lookup.
+ """
+ return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
+
+
+def _details_to_bind_params(details: SearchDetails) -> Dict[str, Any]:
+ """ Create a dictionary from search parameters that can be used
+ as bind parameter for SQL execute.
+ """
+ return {'limit': details.max_results,
+ 'min_rank': details.min_rank,
+ 'max_rank': details.max_rank,
+ 'viewbox': details.viewbox,
+ 'viewbox2': details.viewbox_x2,
+ 'near': details.near,
+ 'near_radius': details.near_radius,
+ 'excluded': details.excluded,
+ 'countries': details.countries}
+
+
+LIMIT_PARAM: SaBind = sa.bindparam('limit')
+MIN_RANK_PARAM: SaBind = sa.bindparam('min_rank')
+MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
+VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry)
+VIEWBOX2_PARAM: SaBind = sa.bindparam('viewbox2', type_=Geometry)
+NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry)
+NEAR_RADIUS_PARAM: SaBind = sa.bindparam('near_radius')
+COUNTRIES_PARAM: SaBind = sa.bindparam('countries')
+
+
+def filter_by_area(sql: SaSelect, t: SaFromClause,
+ details: SearchDetails, avoid_index: bool = False) -> SaSelect:
+ """ Apply SQL statements for filtering by viewbox and near point,
+ if applicable.
+ """
+ if details.near is not None and details.near_radius is not None:
+ if details.near_radius < 0.1 and not avoid_index:
+ sql = sql.where(t.c.geometry.within_distance(NEAR_PARAM, NEAR_RADIUS_PARAM))
+ else:
+ sql = sql.where(t.c.geometry.ST_Distance(NEAR_PARAM) <= NEAR_RADIUS_PARAM)
+ if details.viewbox is not None and details.bounded_viewbox:
+ sql = sql.where(t.c.geometry.intersects(VIEWBOX_PARAM,
+ use_index=not avoid_index and
+ details.viewbox.area < 0.2))
+
+ return sql
+
+
+def _exclude_places(t: SaFromClause) -> Callable[[], SaExpression]:
+ return lambda: t.c.place_id.not_in(sa.bindparam('excluded'))
+
+
+def _select_placex(t: SaFromClause) -> SaSelect:
+ return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
+ t.c.class_, t.c.type,
+ t.c.address, t.c.extratags,
+ t.c.housenumber, t.c.postcode, t.c.country_code,
+ t.c.wikipedia,
+ t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
+ t.c.linked_place_id, t.c.admin_level,
+ t.c.centroid,
+ t.c.geometry.ST_Expand(0).label('bbox'))
+
+
+def _add_geometry_columns(sql: SaLambdaSelect, col: SaColumn, details: SearchDetails) -> SaSelect:
+ out = []
+
+ if details.geometry_simplification > 0.0:
+ col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification)
+
+ if details.geometry_output & GeometryFormat.GEOJSON:
+ out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
+ if details.geometry_output & GeometryFormat.TEXT:
+ out.append(sa.func.ST_AsText(col).label('geometry_text'))
+ if details.geometry_output & GeometryFormat.KML:
+ out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
+ if details.geometry_output & GeometryFormat.SVG:
+ out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
+
+ return sql.add_columns(*out)
+
+
+def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause,
+ numerals: List[int], details: SearchDetails) -> SaScalarSelect:
+ all_ids = sa.func.ArrayAgg(table.c.place_id)
+ sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id)
+
+ if len(numerals) == 1:
+ sql = sql.where(sa.between(numerals[0], table.c.startnumber, table.c.endnumber))\
+ .where((numerals[0] - table.c.startnumber) % table.c.step == 0)
+ else:
+ sql = sql.where(sa.or_(
+ *(sa.and_(sa.between(n, table.c.startnumber, table.c.endnumber),
+ (n - table.c.startnumber) % table.c.step == 0)
+ for n in numerals)))
+
+ if details.excluded:
+ sql = sql.where(_exclude_places(table))
+
+ return sql.scalar_subquery()
+
+
+def _filter_by_layer(table: SaFromClause, layers: DataLayer) -> SaColumn:
+ orexpr: List[SaExpression] = []
+ if layers & DataLayer.ADDRESS and layers & DataLayer.POI:
+ orexpr.append(no_index(table.c.rank_address).between(1, 30))
+ elif layers & DataLayer.ADDRESS:
+ orexpr.append(no_index(table.c.rank_address).between(1, 29))
+ orexpr.append(sa.func.IsAddressPoint(table))
+ elif layers & DataLayer.POI:
+ orexpr.append(sa.and_(no_index(table.c.rank_address) == 30,
+ table.c.class_.not_in(('place', 'building'))))
+
+ if layers & DataLayer.MANMADE:
+ exclude = []
+ if not layers & DataLayer.RAILWAY:
+ exclude.append('railway')
+ if not layers & DataLayer.NATURAL:
+ exclude.extend(('natural', 'water', 'waterway'))
+ orexpr.append(sa.and_(table.c.class_.not_in(tuple(exclude)),
+ no_index(table.c.rank_address) == 0))
+ else:
+ include = []
+ if layers & DataLayer.RAILWAY:
+ include.append('railway')
+ if layers & DataLayer.NATURAL:
+ include.extend(('natural', 'water', 'waterway'))
+ orexpr.append(sa.and_(table.c.class_.in_(tuple(include)),
+ no_index(table.c.rank_address) == 0))
+
+ if len(orexpr) == 1:
+ return orexpr[0]
+
+ return sa.or_(*orexpr)
+
+
+def _interpolated_position(table: SaFromClause, nr: SaColumn) -> SaColumn:
+ pos = sa.cast(nr - table.c.startnumber, sa.Float) / (table.c.endnumber - table.c.startnumber)
+ return sa.case(
+ (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
+ else_=table.c.linegeo.ST_LineInterpolatePoint(pos)).label('centroid')
+
+
+async def _get_placex_housenumbers(conn: SearchConnection,
+ place_ids: List[int],
+ details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
+ t = conn.t.placex
+ sql = _select_placex(t).add_columns(t.c.importance)\
+ .where(t.c.place_id.in_(place_ids))
+
+ if details.geometry_output:
+ sql = _add_geometry_columns(sql, t.c.geometry, details)
+
+ for row in await conn.execute(sql):
+ result = nres.create_from_placex_row(row, nres.SearchResult)
+ assert result
+ result.bbox = Bbox.from_wkb(row.bbox)
+ yield result
+
+
+def _int_list_to_subquery(inp: List[int]) -> 'sa.Subquery':
+ """ Create a subselect that returns the given list of integers
+ as rows in the column 'nr'.
+ """
+ vtab = sa.func.JsonArrayEach(sa.type_coerce(inp, sa.JSON))\
+ .table_valued(sa.column('value', type_=sa.JSON))
+ return sa.select(sa.cast(sa.cast(vtab.c.value, sa.Text), sa.Integer).label('nr')).subquery()
+
+
+async def _get_osmline(conn: SearchConnection, place_ids: List[int],
+ numerals: List[int],
+ details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
+ t = conn.t.osmline
+
+ values = _int_list_to_subquery(numerals)
+ sql = sa.select(t.c.place_id, t.c.osm_id,
+ t.c.parent_place_id, t.c.address,
+ values.c.nr.label('housenumber'),
+ _interpolated_position(t, values.c.nr),
+ t.c.postcode, t.c.country_code)\
+ .where(t.c.place_id.in_(place_ids))\
+ .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
+
+ if details.geometry_output:
+ sub = sql.subquery()
+ sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details)
+
+ for row in await conn.execute(sql):
+ result = nres.create_from_osmline_row(row, nres.SearchResult)
+ assert result
+ yield result
+
+
+async def _get_tiger(conn: SearchConnection, place_ids: List[int],
+ numerals: List[int], osm_id: int,
+ details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
+ t = conn.t.tiger
+ values = _int_list_to_subquery(numerals)
+ sql = sa.select(t.c.place_id, t.c.parent_place_id,
+ sa.literal('W').label('osm_type'),
+ sa.literal(osm_id).label('osm_id'),
+ values.c.nr.label('housenumber'),
+ _interpolated_position(t, values.c.nr),
+ t.c.postcode)\
+ .where(t.c.place_id.in_(place_ids))\
+ .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
+
+ if details.geometry_output:
+ sub = sql.subquery()
+ sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details)
+
+ for row in await conn.execute(sql):
+ result = nres.create_from_tiger_row(row, nres.SearchResult)
+ assert result
+ yield result
+