1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of place lookup by ID.
10 from typing import Optional, Callable, Tuple, Type
13 import sqlalchemy as sa
15 from nominatim_core.typing import SaColumn, SaRow, SaSelect
16 from .connection import SearchConnection
17 from .logging import log
18 from . import types as ntyp
19 from . import results as nres
21 RowFunc = Callable[[Optional[SaRow], Type[nres.BaseResultT]], Optional[nres.BaseResultT]]
23 GeomFunc = Callable[[SaSelect, SaColumn], SaSelect]
26 async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef,
27 add_geometries: GeomFunc) -> Optional[SaRow]:
28 """ Search for the given place in the placex table and return the
31 log().section("Find in placex table")
33 sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
34 t.c.class_, t.c.type, t.c.admin_level,
35 t.c.address, t.c.extratags,
36 t.c.housenumber, t.c.postcode, t.c.country_code,
37 t.c.importance, t.c.wikipedia, t.c.indexed_date,
38 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
40 t.c.geometry.ST_Expand(0).label('bbox'),
43 if isinstance(place, ntyp.PlaceID):
44 sql = sql.where(t.c.place_id == place.place_id)
45 elif isinstance(place, ntyp.OsmID):
46 sql = sql.where(t.c.osm_type == place.osm_type)\
47 .where(t.c.osm_id == place.osm_id)
49 sql = sql.where(t.c.class_ == place.osm_class)
51 sql = sql.order_by(t.c.class_)
56 return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
59 async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef,
60 add_geometries: GeomFunc) -> Optional[SaRow]:
61 """ Search for the given place in the osmline table and return the
64 log().section("Find in interpolation table")
66 sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
67 t.c.indexed_date, t.c.startnumber, t.c.endnumber,
68 t.c.step, t.c.address, t.c.postcode, t.c.country_code,
69 t.c.linegeo.ST_Centroid().label('centroid'))
71 if isinstance(place, ntyp.PlaceID):
72 sql = sql.where(t.c.place_id == place.place_id)
73 elif isinstance(place, ntyp.OsmID) and place.osm_type == 'W':
74 # There may be multiple interpolations for a single way.
75 # If 'class' contains a number, return the one that belongs to that number.
76 sql = sql.where(t.c.osm_id == place.osm_id).limit(1)
77 if place.osm_class and place.osm_class.isdigit():
78 sql = sql.order_by(sa.func.greatest(0,
79 int(place.osm_class) - t.c.endnumber,
80 t.c.startnumber - int(place.osm_class)))
84 return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
87 async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef,
88 add_geometries: GeomFunc) -> Optional[SaRow]:
89 """ Search for the given place in the table of Tiger addresses and return
90 the base information. Only lookup by place ID is supported.
92 if not isinstance(place, ntyp.PlaceID):
95 log().section("Find in TIGER table")
97 parent = conn.t.placex
98 sql = sa.select(t.c.place_id, t.c.parent_place_id,
99 parent.c.osm_type, parent.c.osm_id,
100 t.c.startnumber, t.c.endnumber, t.c.step,
102 t.c.linegeo.ST_Centroid().label('centroid'))\
103 .where(t.c.place_id == place.place_id)\
104 .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)
106 return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
109 async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef,
110 add_geometries: GeomFunc) -> Optional[SaRow]:
111 """ Search for the given place in the postcode table and return the
112 base information. Only lookup by place ID is supported.
114 if not isinstance(place, ntyp.PlaceID):
117 log().section("Find in postcode table")
119 sql = sa.select(t.c.place_id, t.c.parent_place_id,
120 t.c.rank_search, t.c.rank_address,
121 t.c.indexed_date, t.c.postcode, t.c.country_code,
122 t.c.geometry.label('centroid')) \
123 .where(t.c.place_id == place.place_id)
125 return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
128 async def find_in_all_tables(conn: SearchConnection, place: ntyp.PlaceRef,
129 add_geometries: GeomFunc
130 ) -> Tuple[Optional[SaRow], RowFunc[nres.BaseResultT]]:
131 """ Search for the given place in all data tables
132 and return the base information.
134 row = await find_in_placex(conn, place, add_geometries)
135 log().var_dump('Result (placex)', row)
137 return row, nres.create_from_placex_row
139 row = await find_in_osmline(conn, place, add_geometries)
140 log().var_dump('Result (osmline)', row)
142 return row, nres.create_from_osmline_row
144 row = await find_in_postcode(conn, place, add_geometries)
145 log().var_dump('Result (postcode)', row)
147 return row, nres.create_from_postcode_row
149 row = await find_in_tiger(conn, place, add_geometries)
150 log().var_dump('Result (tiger)', row)
151 return row, nres.create_from_tiger_row
154 async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef,
155 details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
156 """ Retrieve a place with additional details from the database.
158 log().function('get_detailed_place', place=place, details=details)
160 if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
161 raise ValueError("lookup only supports geojosn polygon output.")
163 if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
164 def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
165 return sql.add_columns(sa.func.ST_AsGeoJSON(
166 sa.case((sa.func.ST_NPoints(column) > 5000,
167 sa.func.ST_SimplifyPreserveTopology(column, 0.0001)),
168 else_=column), 7).label('geometry_geojson'))
170 def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
171 return sql.add_columns(sa.func.ST_GeometryType(column).label('geometry_type'))
173 row_func: RowFunc[nres.DetailedResult]
174 row, row_func = await find_in_all_tables(conn, place, _add_geometry)
179 result = row_func(row, nres.DetailedResult)
180 assert result is not None
182 # add missing details
183 assert result is not None
184 if 'type' in result.geometry:
185 result.geometry['type'] = GEOMETRY_TYPE_MAP.get(result.geometry['type'],
186 result.geometry['type'])
187 indexed_date = getattr(row, 'indexed_date', None)
188 if indexed_date is not None:
189 result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
191 await nres.add_result_details(conn, [result], details)
196 async def get_simple_place(conn: SearchConnection, place: ntyp.PlaceRef,
197 details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
198 """ Retrieve a place as a simple search result from the database.
200 log().function('get_simple_place', place=place, details=details)
202 def _add_geometry(sql: SaSelect, col: SaColumn) -> SaSelect:
203 if not details.geometry_output:
208 if details.geometry_simplification > 0.0:
209 col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification)
211 if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
212 out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
213 if details.geometry_output & ntyp.GeometryFormat.TEXT:
214 out.append(sa.func.ST_AsText(col).label('geometry_text'))
215 if details.geometry_output & ntyp.GeometryFormat.KML:
216 out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
217 if details.geometry_output & ntyp.GeometryFormat.SVG:
218 out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
220 return sql.add_columns(*out)
223 row_func: RowFunc[nres.SearchResult]
224 row, row_func = await find_in_all_tables(conn, place, _add_geometry)
229 result = row_func(row, nres.SearchResult)
230 assert result is not None
232 # add missing details
233 assert result is not None
234 if hasattr(row, 'bbox'):
235 result.bbox = ntyp.Bbox.from_wkb(row.bbox)
237 await nres.add_result_details(conn, [result], details)
242 GEOMETRY_TYPE_MAP = {
244 'MULTIPOINT': 'ST_MultiPoint',
245 'LINESTRING': 'ST_LineString',
246 'MULTILINESTRING': 'ST_MultiLineString',
247 'POLYGON': 'ST_Polygon',
248 'MULTIPOLYGON': 'ST_MultiPolygon',
249 'GEOMETRYCOLLECTION': 'ST_GeometryCollection'