1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any, cast, Union
13 import sqlalchemy as sa
15 from .typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow, \
16 SaBind, SaLambdaSelect
17 from .sql.sqlalchemy_types import Geometry
18 from .connection import SearchConnection
19 from . import results as nres
20 from .logging import log
21 from .types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
24 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
26 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
27 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
30 def no_index(expr: SaColumn) -> SaColumn:
31 """ Wrap the given expression, so that the query planner will
32 refrain from using the expression for index lookup.
34 return sa.func.coalesce(sa.null(), expr)
37 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
38 """ Create a select statement with the columns relevant for reverse
42 distance = t.c.distance
43 centroid = t.c.centroid
45 distance = t.c.geometry.ST_Distance(WKT_PARAM)
46 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
47 else_=t.c.centroid).label('centroid')
49 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
51 t.c.address, t.c.extratags,
52 t.c.housenumber, t.c.postcode, t.c.country_code,
53 t.c.importance, t.c.wikipedia,
54 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
56 t.c.linked_place_id, t.c.admin_level,
57 distance.label('distance'),
58 t.c.geometry.ST_Expand(0).label('bbox'))
61 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
62 return sa.cast(table.c.startnumber
63 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
64 / table.c.step) * table.c.step,
65 sa.Integer).label('housenumber')
68 def _interpolated_position(table: SaFromClause) -> SaLabel:
69 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
70 rounded_pos = sa.func.round(table.c.position / fac) * fac
72 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
73 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
76 def _locate_interpolation(table: SaFromClause) -> SaLabel:
77 """ Given a position, locate the closest point on the line.
79 return sa.case((table.c.linegeo.is_line_like(),
80 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
81 else_=0).label('position')
84 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
85 return min(rows, key=lambda row: 1000 if row is None else row.distance)
88 class ReverseGeocoder:
89 """ Class implementing the logic for looking up a place from a
93 def __init__(self, conn: SearchConnection, params: ReverseDetails,
94 restrict_to_country_areas: bool = False) -> None:
97 self.restrict_to_country_areas = restrict_to_country_areas
99 self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
102 def max_rank(self) -> int:
103 """ Return the maximum configured rank.
105 return self.params.max_rank
107 def has_geometries(self) -> bool:
108 """ Check if any geometries are requested.
110 return bool(self.params.geometry_output)
112 def layer_enabled(self, *layer: DataLayer) -> bool:
113 """ Return true when any of the given layer types are requested.
115 return any(self.params.layers & ly for ly in layer)
117 def layer_disabled(self, *layer: DataLayer) -> bool:
118 """ Return true when none of the given layer types is requested.
120 return not any(self.params.layers & ly for ly in layer)
122 def has_feature_layers(self) -> bool:
123 """ Return true if any layer other than ADDRESS or POI is requested.
125 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
127 def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
130 if self.params.geometry_simplification > 0.0:
131 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
133 if self.params.geometry_output & GeometryFormat.GEOJSON:
134 out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
135 if self.params.geometry_output & GeometryFormat.TEXT:
136 out.append(sa.func.ST_AsText(col).label('geometry_text'))
137 if self.params.geometry_output & GeometryFormat.KML:
138 out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
139 if self.params.geometry_output & GeometryFormat.SVG:
140 out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
142 return sql.add_columns(*out)
144 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
145 if self.layer_enabled(DataLayer.MANMADE):
147 if self.layer_disabled(DataLayer.RAILWAY):
148 exclude.append('railway')
149 if self.layer_disabled(DataLayer.NATURAL):
150 exclude.extend(('natural', 'water', 'waterway'))
151 return table.c.class_.not_in(tuple(exclude))
154 if self.layer_enabled(DataLayer.RAILWAY):
155 include.append('railway')
156 if self.layer_enabled(DataLayer.NATURAL):
157 include.extend(('natural', 'water', 'waterway'))
158 return table.c.class_.in_(tuple(include))
160 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
161 """ Look up the closest rank 26+ place in the database, which
162 is closer than the given distance.
164 t = self.conn.t.placex
166 # PostgreSQL must not get the distance as a parameter because
167 # there is a danger it won't be able to properly estimate index use
168 # when used with prepared statements
169 diststr = sa.text(f"{distance}")
171 sql: SaLambdaSelect = sa.lambda_stmt(
172 lambda: _select_from_placex(t)
173 .where(t.c.geometry.within_distance(WKT_PARAM, diststr))
174 .where(t.c.indexed_status == 0)
175 .where(t.c.linked_place_id == None)
176 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
177 t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
178 .order_by('distance')
181 if self.has_geometries():
182 sql = self._add_geometry_columns(sql, t.c.geometry)
184 restrict: List[Union[SaColumn, Callable[[], SaColumn]]] = []
186 if self.layer_enabled(DataLayer.ADDRESS):
187 max_rank = min(29, self.max_rank)
188 restrict.append(lambda: no_index(t.c.rank_address).between(26, max_rank))
189 if self.max_rank == 30:
190 restrict.append(lambda: sa.func.IsAddressPoint(t))
191 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
192 restrict.append(lambda: sa.and_(no_index(t.c.rank_search) == 30,
193 t.c.class_.not_in(('place', 'building')),
194 sa.not_(t.c.geometry.is_line_like())))
195 if self.has_feature_layers():
196 restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
197 no_index(t.c.rank_address) == 0,
198 self._filter_by_layer(t)))
203 sql = sql.where(sa.or_(*restrict))
205 # If the closest object is inside an area, then check if there is a
206 # POI node nearby and return that.
208 for row in await self.conn.execute(sql, self.bind_params):
210 if row.rank_search <= 27 or row.osm_type == 'N' or row.distance > 0:
214 if row.rank_search > 27 and row.osm_type == 'N'\
215 and row.distance < 0.0001:
220 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
221 t = self.conn.t.placex
223 def _base_query() -> SaSelect:
224 return _select_from_placex(t)\
225 .where(t.c.geometry.within_distance(WKT_PARAM, 0.001))\
226 .where(t.c.parent_place_id == parent_place_id)\
227 .where(sa.func.IsAddressPoint(t))\
228 .where(t.c.indexed_status == 0)\
229 .where(t.c.linked_place_id == None)\
230 .order_by('distance')\
234 if self.has_geometries():
235 sql = self._add_geometry_columns(_base_query(), t.c.geometry)
237 sql = sa.lambda_stmt(_base_query)
239 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
241 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
242 distance: float) -> Optional[SaRow]:
243 t = self.conn.t.osmline
246 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
247 _locate_interpolation(t))\
248 .where(t.c.linegeo.within_distance(WKT_PARAM, distance))\
249 .where(t.c.startnumber != None)\
250 .order_by('distance')\
253 if parent_place_id is not None:
254 sql = sql.where(t.c.parent_place_id == parent_place_id)
256 inner = sql.subquery('ipol')
258 sql = sa.select(inner.c.place_id, inner.c.osm_id,
259 inner.c.parent_place_id, inner.c.address,
260 _interpolated_housenumber(inner),
261 _interpolated_position(inner),
262 inner.c.postcode, inner.c.country_code,
265 if self.has_geometries():
266 sub = sql.subquery('geom')
267 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
269 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
271 async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
272 t = self.conn.t.tiger
274 def _base_query() -> SaSelect:
276 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
277 _locate_interpolation(t))\
278 .where(t.c.linegeo.within_distance(WKT_PARAM, 0.001))\
279 .where(t.c.parent_place_id == parent_place_id)\
280 .order_by('distance')\
284 return sa.select(inner.c.place_id,
285 inner.c.parent_place_id,
286 _interpolated_housenumber(inner),
287 _interpolated_position(inner),
292 if self.has_geometries():
293 sub = _base_query().subquery('geom')
294 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
296 sql = sa.lambda_stmt(_base_query)
298 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
300 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
301 """ Find a street or POI/address for the given WKT point.
303 log().section('Reverse lookup on street/address level')
305 parent_place_id = None
307 row = await self._find_closest_street_or_poi(distance)
308 row_func: RowFunc = nres.create_from_placex_row
309 log().var_dump('Result (street/building)', row)
311 # If the closest result was a street, but an address was requested,
312 # check for a housenumber nearby which is part of the street.
314 if self.max_rank > 27 \
315 and self.layer_enabled(DataLayer.ADDRESS) \
316 and row.rank_address <= 27:
318 parent_place_id = row.place_id
319 log().comment('Find housenumber for street')
320 addr_row = await self._find_housenumber_for_street(parent_place_id)
321 log().var_dump('Result (street housenumber)', addr_row)
323 if addr_row is not None:
325 row_func = nres.create_from_placex_row
326 distance = addr_row.distance
327 elif row.country_code == 'us' and parent_place_id is not None:
328 log().comment('Find TIGER housenumber for street')
329 addr_row = await self._find_tiger_number_for_street(parent_place_id)
330 log().var_dump('Result (street Tiger housenumber)', addr_row)
332 if addr_row is not None:
333 row_func = cast(RowFunc,
334 functools.partial(nres.create_from_tiger_row,
335 osm_type=row.osm_type,
339 distance = row.distance
341 # Check for an interpolation that is either closer than our result
342 # or belongs to a close street found.
343 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
344 log().comment('Find interpolation for street')
345 addr_row = await self._find_interpolation_for_street(parent_place_id,
347 log().var_dump('Result (street interpolation)', addr_row)
348 if addr_row is not None:
350 row_func = nres.create_from_osmline_row
354 async def _lookup_area_address(self) -> Optional[SaRow]:
355 """ Lookup large addressable areas for the given WKT point.
357 log().comment('Reverse lookup by larger address area features')
358 t = self.conn.t.placex
360 def _base_query() -> SaSelect:
361 # The inner SQL brings results in the right order, so that
362 # later only a minimum of results needs to be checked with ST_Contains.
363 inner = sa.select(t, sa.literal(0.0).label('distance'))\
364 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
365 .where(t.c.geometry.intersects(WKT_PARAM))\
366 .where(sa.func.PlacexGeometryReverseLookuppolygon())\
367 .order_by(sa.desc(t.c.rank_search))\
371 return _select_from_placex(inner, False)\
372 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
373 .order_by(sa.desc(inner.c.rank_search))\
376 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
377 if self.has_geometries():
378 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
380 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
381 log().var_dump('Result (area)', address_row)
383 if address_row is not None and address_row.rank_search < self.max_rank:
384 log().comment('Search for better matching place nodes inside the area')
386 address_rank = address_row.rank_search
387 address_id = address_row.place_id
389 def _place_inside_area_query() -> SaSelect:
391 sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
392 .where(t.c.rank_search > address_rank)\
393 .where(t.c.rank_search <= MAX_RANK_PARAM)\
394 .where(t.c.indexed_status == 0)\
395 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
396 .order_by(sa.desc(t.c.rank_search))\
400 touter = t.alias('outer')
401 return _select_from_placex(inner, False)\
402 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
403 .where(touter.c.place_id == address_id)\
404 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
405 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
408 if self.has_geometries():
409 sql = self._add_geometry_columns(_place_inside_area_query(),
410 sa.literal_column('places.geometry'))
412 sql = sa.lambda_stmt(_place_inside_area_query)
414 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
415 log().var_dump('Result (place node)', place_address_row)
417 if place_address_row is not None:
418 return place_address_row
422 async def _lookup_area_others(self) -> Optional[SaRow]:
423 t = self.conn.t.placex
425 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
426 .where(t.c.rank_address == 0)\
427 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
428 .where(t.c.name != None)\
429 .where(t.c.indexed_status == 0)\
430 .where(t.c.linked_place_id == None)\
431 .where(self._filter_by_layer(t))\
432 .where(t.c.geometry.intersects(sa.func.ST_Expand(WKT_PARAM, 0.007)))\
433 .order_by(sa.desc(t.c.rank_search))\
434 .order_by('distance')\
438 sql = _select_from_placex(inner, False)\
439 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
440 inner.c.geometry.ST_Contains(WKT_PARAM)))\
441 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
444 if self.has_geometries():
445 sql = self._add_geometry_columns(sql, inner.c.geometry)
447 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
448 log().var_dump('Result (non-address feature)', row)
452 async def lookup_area(self) -> Optional[SaRow]:
453 """ Lookup large areas for the current search.
455 log().section('Reverse lookup by larger area features')
457 if self.layer_enabled(DataLayer.ADDRESS):
458 address_row = await self._lookup_area_address()
462 if self.has_feature_layers():
463 other_row = await self._lookup_area_others()
467 return _get_closest(address_row, other_row)
469 async def lookup_country_codes(self) -> List[str]:
470 """ Lookup the country for the current search.
472 log().section('Reverse lookup by country code')
473 t = self.conn.t.country_grid
474 sql = sa.select(t.c.country_code).distinct()\
475 .where(t.c.geometry.ST_Contains(WKT_PARAM))
477 ccodes = [cast(str, r[0]) for r in await self.conn.execute(sql, self.bind_params)]
478 log().var_dump('Country codes', ccodes)
481 async def lookup_country(self, ccodes: List[str]) -> Tuple[Optional[SaRow], RowFunc]:
482 """ Lookup the country for the current search.
484 row_func = nres.create_from_placex_row
486 ccodes = await self.lookup_country_codes()
489 return None, row_func
491 t = self.conn.t.placex
492 if self.max_rank > 4:
493 log().comment('Search for place nodes in country')
495 def _base_query() -> SaSelect:
496 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
497 .where(t.c.rank_search > 4)\
498 .where(t.c.rank_search <= MAX_RANK_PARAM)\
499 .where(t.c.indexed_status == 0)\
500 .where(t.c.country_code.in_(ccodes))\
501 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
502 .order_by(sa.desc(t.c.rank_search))\
506 return _select_from_placex(inner, False)\
507 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
508 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
512 if self.has_geometries():
513 sql = self._add_geometry_columns(_base_query(),
514 sa.literal_column('area.geometry'))
516 sql = sa.lambda_stmt(_base_query)
518 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
519 log().var_dump('Result (addressable place node)', address_row)
523 if address_row is None:
524 # Still nothing, then return a country with the appropriate country code.
525 def _country_base_query() -> SaSelect:
526 return _select_from_placex(t)\
527 .where(t.c.country_code.in_(ccodes))\
528 .where(t.c.rank_address == 4)\
529 .where(t.c.rank_search == 4)\
530 .where(t.c.linked_place_id == None)\
531 .order_by('distance')\
534 if self.has_geometries():
535 sql = self._add_geometry_columns(_country_base_query(), t.c.geometry)
537 sql = sa.lambda_stmt(_country_base_query)
539 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
541 if address_row is None:
542 # finally fall back to country table
543 t = self.conn.t.country_name
544 tgrid = self.conn.t.country_grid
546 sql = sa.select(tgrid.c.country_code,
547 tgrid.c.geometry.ST_Centroid().ST_Collect().ST_Centroid()
549 tgrid.c.geometry.ST_Collect().ST_Expand(0).label('bbox'))\
550 .where(tgrid.c.country_code.in_(ccodes))\
551 .group_by(tgrid.c.country_code)
553 sub = sql.subquery('grid')
554 sql = sa.select(t.c.country_code,
555 t.c.name.merge(t.c.derived_name).label('name'),
556 sub.c.centroid, sub.c.bbox)\
557 .join(sub, t.c.country_code == sub.c.country_code)\
558 .order_by(t.c.country_code)\
561 sql = self._add_geometry_columns(sql, sub.c.centroid)
563 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
564 row_func = nres.create_from_country_row
566 return address_row, row_func
568 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
569 """ Look up a single coordinate. Returns the place information,
570 if a place was found near the coordinates or None otherwise.
572 log().function('reverse_lookup', coord=coord, params=self.params)
574 self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
576 row: Optional[SaRow] = None
577 row_func: RowFunc = nres.create_from_placex_row
579 if self.max_rank >= 26:
580 row, tmp_row_func = await self.lookup_street_poi()
582 row_func = tmp_row_func
585 if self.restrict_to_country_areas:
586 ccodes = await self.lookup_country_codes()
592 if self.max_rank > 4:
593 row = await self.lookup_area()
594 if row is None and self.layer_enabled(DataLayer.ADDRESS):
595 row, row_func = await self.lookup_country(ccodes)
597 result = row_func(row, nres.ReverseResult)
598 if result is not None:
599 assert row is not None
600 result.distance = getattr(row, 'distance', 0)
601 if hasattr(row, 'bbox'):
602 result.bbox = Bbox.from_wkb(row.bbox)
603 await nres.add_result_details(self.conn, [result], self.params)