"""
Implementation of place lookup by ID.
"""
-from typing import Optional
+from typing import Optional, Callable, Tuple, Type
+import datetime as dt
import sqlalchemy as sa
-from nominatim.typing import SaColumn, SaLabel, SaRow
+from nominatim.typing import SaColumn, SaRow, SaSelect
from nominatim.api.connection import SearchConnection
import nominatim.api.types as ntyp
import nominatim.api.results as nres
+from nominatim.api.logging import log
-def _select_column_geometry(column: SaColumn,
- geometry_output: ntyp.GeometryFormat) -> SaLabel:
- """ Create the appropriate column expression for selecting a
- geometry for the details response.
- """
- if geometry_output & ntyp.GeometryFormat.GEOJSON:
- return sa.literal_column(f"""
- ST_AsGeoJSON(CASE WHEN ST_NPoints({0}) > 5000
- THEN ST_SimplifyPreserveTopology({0}, 0.0001)
- ELSE {column.name} END)
- """).label('geometry_geojson')
+RowFunc = Callable[[Optional[SaRow], Type[nres.BaseResultT]], Optional[nres.BaseResultT]]
+
+GeomFunc = Callable[[SaSelect, SaColumn], SaSelect]
- return sa.func.ST_GeometryType(column).label('geometry_type')
async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef,
- details: ntyp.LookupDetails) -> Optional[SaRow]:
+ add_geometries: GeomFunc) -> Optional[SaRow]:
""" Search for the given place in the placex table and return the
base information.
"""
+ log().section("Find in placex table")
t = conn.t.placex
sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_, t.c.type, t.c.admin_level,
t.c.importance, t.c.wikipedia, t.c.indexed_date,
t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
t.c.linked_place_id,
- sa.func.ST_X(t.c.centroid).label('x'),
- sa.func.ST_Y(t.c.centroid).label('y'),
- _select_column_geometry(t.c.geometry, details.geometry_output))
+ t.c.geometry.ST_Expand(0).label('bbox'),
+ t.c.centroid)
if isinstance(place, ntyp.PlaceID):
sql = sql.where(t.c.place_id == place.place_id)
else:
return None
- return (await conn.execute(sql)).one_or_none()
+ return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
+
+
+async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef,
+ add_geometries: GeomFunc) -> Optional[SaRow]:
+ """ Search for the given place in the osmline table and return the
+ base information.
+ """
+ log().section("Find in interpolation table")
+ t = conn.t.osmline
+ sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
+ t.c.indexed_date, t.c.startnumber, t.c.endnumber,
+ t.c.step, t.c.address, t.c.postcode, t.c.country_code,
+ t.c.linegeo.ST_Centroid().label('centroid'))
+
+ if isinstance(place, ntyp.PlaceID):
+ sql = sql.where(t.c.place_id == place.place_id)
+ elif isinstance(place, ntyp.OsmID) and place.osm_type == 'W':
+ # There may be multiple interpolations for a single way.
+ # If 'class' contains a number, return the one that belongs to that number.
+ sql = sql.where(t.c.osm_id == place.osm_id).limit(1)
+ if place.osm_class and place.osm_class.isdigit():
+ sql = sql.order_by(sa.func.greatest(0,
+ int(place.osm_class) - t.c.endnumber,
+ t.c.startnumber - int(place.osm_class)))
+ else:
+ return None
+
+ return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
+
+
+async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef,
+ add_geometries: GeomFunc) -> Optional[SaRow]:
+ """ Search for the given place in the table of Tiger addresses and return
+ the base information. Only lookup by place ID is supported.
+ """
+ if not isinstance(place, ntyp.PlaceID):
+ return None
+
+ log().section("Find in TIGER table")
+ t = conn.t.tiger
+ parent = conn.t.placex
+ sql = sa.select(t.c.place_id, t.c.parent_place_id,
+ parent.c.osm_type, parent.c.osm_id,
+ t.c.startnumber, t.c.endnumber, t.c.step,
+ t.c.postcode,
+ t.c.linegeo.ST_Centroid().label('centroid'))\
+ .where(t.c.place_id == place.place_id)\
+ .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)
+
+ return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
+
+
+async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef,
+ add_geometries: GeomFunc) -> Optional[SaRow]:
+ """ Search for the given place in the postcode table and return the
+ base information. Only lookup by place ID is supported.
+ """
+ if not isinstance(place, ntyp.PlaceID):
+ return None
+
+ log().section("Find in postcode table")
+ t = conn.t.postcode
+ sql = sa.select(t.c.place_id, t.c.parent_place_id,
+ t.c.rank_search, t.c.rank_address,
+ t.c.indexed_date, t.c.postcode, t.c.country_code,
+ t.c.geometry.label('centroid')) \
+ .where(t.c.place_id == place.place_id)
+
+ return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
+
+
+async def find_in_all_tables(conn: SearchConnection, place: ntyp.PlaceRef,
+ add_geometries: GeomFunc
+ ) -> Tuple[Optional[SaRow], RowFunc[nres.BaseResultT]]:
+ """ Search for the given place in all data tables
+ and return the base information.
+ """
+ row = await find_in_placex(conn, place, add_geometries)
+ log().var_dump('Result (placex)', row)
+ if row is not None:
+ return row, nres.create_from_placex_row
+
+ row = await find_in_osmline(conn, place, add_geometries)
+ log().var_dump('Result (osmline)', row)
+ if row is not None:
+ return row, nres.create_from_osmline_row
+
+ row = await find_in_postcode(conn, place, add_geometries)
+ log().var_dump('Result (postcode)', row)
+ if row is not None:
+ return row, nres.create_from_postcode_row
+
+ row = await find_in_tiger(conn, place, add_geometries)
+ log().var_dump('Result (tiger)', row)
+ return row, nres.create_from_tiger_row
-async def get_place_by_id(conn: SearchConnection, place: ntyp.PlaceRef,
- details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
+async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef,
+ details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
""" Retrieve a place with additional details from the database.
"""
+ log().function('get_detailed_place', place=place, details=details)
+
if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
raise ValueError("lookup only supports geojosn polygon output.")
- row = await find_in_placex(conn, place, details)
- if row is not None:
- result = nres.create_from_placex_row(row=row)
- await nres.add_result_details(conn, result, details)
- return result
+ if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
+ def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
+ return sql.add_columns(sa.func.ST_AsGeoJSON(
+ sa.case((sa.func.ST_NPoints(column) > 5000,
+ sa.func.ST_SimplifyPreserveTopology(column, 0.0001)),
+ else_=column)).label('geometry_geojson'))
+ else:
+ def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
+ return sql.add_columns(sa.func.ST_GeometryType(column).label('geometry_type'))
+
+ row_func: RowFunc[nres.DetailedResult]
+ row, row_func = await find_in_all_tables(conn, place, _add_geometry)
+
+ if row is None:
+ return None
+
+ result = row_func(row, nres.DetailedResult)
+ assert result is not None
+
+ # add missing details
+ assert result is not None
+ if 'type' in result.geometry:
+ result.geometry['type'] = GEOMETRY_TYPE_MAP.get(result.geometry['type'],
+ result.geometry['type'])
+ indexed_date = getattr(row, 'indexed_date', None)
+ if indexed_date is not None:
+ result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
+
+ await nres.add_result_details(conn, [result], details)
+
+ return result
+
+
+async def get_simple_place(conn: SearchConnection, place: ntyp.PlaceRef,
+ details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
+ """ Retrieve a place as a simple search result from the database.
+ """
+ log().function('get_simple_place', place=place, details=details)
+
+ def _add_geometry(sql: SaSelect, col: SaColumn) -> SaSelect:
+ if not details.geometry_output:
+ return sql
+
+ out = []
+
+ if details.geometry_simplification > 0.0:
+ col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification)
+
+ if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
+ out.append(sa.func.ST_AsGeoJSON(col).label('geometry_geojson'))
+ if details.geometry_output & ntyp.GeometryFormat.TEXT:
+ out.append(sa.func.ST_AsText(col).label('geometry_text'))
+ if details.geometry_output & ntyp.GeometryFormat.KML:
+ out.append(sa.func.ST_AsKML(col).label('geometry_kml'))
+ if details.geometry_output & ntyp.GeometryFormat.SVG:
+ out.append(sa.func.ST_AsSVG(col).label('geometry_svg'))
+
+ return sql.add_columns(*out)
+
+
+ row_func: RowFunc[nres.SearchResult]
+ row, row_func = await find_in_all_tables(conn, place, _add_geometry)
+
+ if row is None:
+ return None
+
+ result = row_func(row, nres.SearchResult)
+ assert result is not None
+
+ # add missing details
+ assert result is not None
+ if hasattr(row, 'bbox'):
+ result.bbox = ntyp.Bbox.from_wkb(row.bbox)
+
+ await nres.add_result_details(conn, [result], details)
+
+ return result
+
- # Nothing found under this ID.
- return None
+GEOMETRY_TYPE_MAP = {
+ 'POINT': 'ST_Point',
+ 'MULTIPOINT': 'ST_MultiPoint',
+ 'LINESTRING': 'ST_LineString',
+ 'MULTILINESTRING': 'ST_MultiLineString',
+ 'POLYGON': 'ST_Polygon',
+ 'MULTIPOLYGON': 'ST_MultiPolygon',
+ 'GEOMETRYCOLLECTION': 'ST_GeometryCollection'
+}