1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any, cast, Union
13 import sqlalchemy as sa
15 from .typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow, \
16 SaBind, SaLambdaSelect
17 from .sql.sqlalchemy_types import Geometry
18 from .connection import SearchConnection
19 from . import results as nres
20 from .logging import log
21 from .types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
23 # In SQLAlchemy expression which compare with NULL need to be expressed with
25 # pylint: disable=singleton-comparison
27 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
29 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
30 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
33 def no_index(expr: SaColumn) -> SaColumn:
34 """ Wrap the given expression, so that the query planner will
35 refrain from using the expression for index lookup.
37 return sa.func.coalesce(sa.null(), expr)
40 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
41 """ Create a select statement with the columns relevant for reverse
45 distance = t.c.distance
46 centroid = t.c.centroid
48 distance = t.c.geometry.ST_Distance(WKT_PARAM)
49 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
50 else_=t.c.centroid).label('centroid')
52 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
54 t.c.address, t.c.extratags,
55 t.c.housenumber, t.c.postcode, t.c.country_code,
56 t.c.importance, t.c.wikipedia,
57 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
59 t.c.linked_place_id, t.c.admin_level,
60 distance.label('distance'),
61 t.c.geometry.ST_Expand(0).label('bbox'))
64 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
65 return sa.cast(table.c.startnumber
66 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
67 / table.c.step) * table.c.step,
68 sa.Integer).label('housenumber')
71 def _interpolated_position(table: SaFromClause) -> SaLabel:
72 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
73 rounded_pos = sa.func.round(table.c.position / fac) * fac
75 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
76 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
79 def _locate_interpolation(table: SaFromClause) -> SaLabel:
80 """ Given a position, locate the closest point on the line.
82 return sa.case((table.c.linegeo.is_line_like(),
83 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
84 else_=0).label('position')
87 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
88 return min(rows, key=lambda row: 1000 if row is None else row.distance)
91 class ReverseGeocoder:
92 """ Class implementing the logic for looking up a place from a
96 def __init__(self, conn: SearchConnection, params: ReverseDetails,
97 restrict_to_country_areas: bool = False) -> None:
100 self.restrict_to_country_areas = restrict_to_country_areas
102 self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
105 def max_rank(self) -> int:
106 """ Return the maximum configured rank.
108 return self.params.max_rank
110 def has_geometries(self) -> bool:
111 """ Check if any geometries are requested.
113 return bool(self.params.geometry_output)
115 def layer_enabled(self, *layer: DataLayer) -> bool:
116 """ Return true when any of the given layer types are requested.
118 return any(self.params.layers & ly for ly in layer)
120 def layer_disabled(self, *layer: DataLayer) -> bool:
121 """ Return true when none of the given layer types is requested.
123 return not any(self.params.layers & ly for ly in layer)
125 def has_feature_layers(self) -> bool:
126 """ Return true if any layer other than ADDRESS or POI is requested.
128 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
130 def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
133 if self.params.geometry_simplification > 0.0:
134 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
136 if self.params.geometry_output & GeometryFormat.GEOJSON:
137 out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
138 if self.params.geometry_output & GeometryFormat.TEXT:
139 out.append(sa.func.ST_AsText(col).label('geometry_text'))
140 if self.params.geometry_output & GeometryFormat.KML:
141 out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
142 if self.params.geometry_output & GeometryFormat.SVG:
143 out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
145 return sql.add_columns(*out)
147 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
148 if self.layer_enabled(DataLayer.MANMADE):
150 if self.layer_disabled(DataLayer.RAILWAY):
151 exclude.append('railway')
152 if self.layer_disabled(DataLayer.NATURAL):
153 exclude.extend(('natural', 'water', 'waterway'))
154 return table.c.class_.not_in(tuple(exclude))
157 if self.layer_enabled(DataLayer.RAILWAY):
158 include.append('railway')
159 if self.layer_enabled(DataLayer.NATURAL):
160 include.extend(('natural', 'water', 'waterway'))
161 return table.c.class_.in_(tuple(include))
163 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
164 """ Look up the closest rank 26+ place in the database, which
165 is closer than the given distance.
167 t = self.conn.t.placex
169 # PostgreSQL must not get the distance as a parameter because
170 # there is a danger it won't be able to properly estimate index use
171 # when used with prepared statements
172 diststr = sa.text(f"{distance}")
174 sql: SaLambdaSelect = sa.lambda_stmt(
175 lambda: _select_from_placex(t)
176 .where(t.c.geometry.within_distance(WKT_PARAM, diststr))
177 .where(t.c.indexed_status == 0)
178 .where(t.c.linked_place_id == None)
179 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
180 t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
181 .order_by('distance')
184 if self.has_geometries():
185 sql = self._add_geometry_columns(sql, t.c.geometry)
187 restrict: List[Union[SaColumn, Callable[[], SaColumn]]] = []
189 if self.layer_enabled(DataLayer.ADDRESS):
190 max_rank = min(29, self.max_rank)
191 restrict.append(lambda: no_index(t.c.rank_address).between(26, max_rank))
192 if self.max_rank == 30:
193 restrict.append(lambda: sa.func.IsAddressPoint(t))
194 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
195 restrict.append(lambda: sa.and_(no_index(t.c.rank_search) == 30,
196 t.c.class_.not_in(('place', 'building')),
197 sa.not_(t.c.geometry.is_line_like())))
198 if self.has_feature_layers():
199 restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
200 no_index(t.c.rank_address) == 0,
201 self._filter_by_layer(t)))
206 sql = sql.where(sa.or_(*restrict))
208 # If the closest object is inside an area, then check if there is a
209 # POI node nearby and return that.
211 for row in await self.conn.execute(sql, self.bind_params):
213 if row.rank_search <= 27 or row.osm_type == 'N' or row.distance > 0:
217 if row.rank_search > 27 and row.osm_type == 'N'\
218 and row.distance < 0.0001:
223 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
224 t = self.conn.t.placex
226 def _base_query() -> SaSelect:
227 return _select_from_placex(t)\
228 .where(t.c.geometry.within_distance(WKT_PARAM, 0.001))\
229 .where(t.c.parent_place_id == parent_place_id)\
230 .where(sa.func.IsAddressPoint(t))\
231 .where(t.c.indexed_status == 0)\
232 .where(t.c.linked_place_id == None)\
233 .order_by('distance')\
237 if self.has_geometries():
238 sql = self._add_geometry_columns(_base_query(), t.c.geometry)
240 sql = sa.lambda_stmt(_base_query)
242 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
244 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
245 distance: float) -> Optional[SaRow]:
246 t = self.conn.t.osmline
249 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
250 _locate_interpolation(t))\
251 .where(t.c.linegeo.within_distance(WKT_PARAM, distance))\
252 .where(t.c.startnumber != None)\
253 .order_by('distance')\
256 if parent_place_id is not None:
257 sql = sql.where(t.c.parent_place_id == parent_place_id)
259 inner = sql.subquery('ipol')
261 sql = sa.select(inner.c.place_id, inner.c.osm_id,
262 inner.c.parent_place_id, inner.c.address,
263 _interpolated_housenumber(inner),
264 _interpolated_position(inner),
265 inner.c.postcode, inner.c.country_code,
268 if self.has_geometries():
269 sub = sql.subquery('geom')
270 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
272 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
274 async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
275 t = self.conn.t.tiger
277 def _base_query() -> SaSelect:
279 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
280 _locate_interpolation(t))\
281 .where(t.c.linegeo.within_distance(WKT_PARAM, 0.001))\
282 .where(t.c.parent_place_id == parent_place_id)\
283 .order_by('distance')\
287 return sa.select(inner.c.place_id,
288 inner.c.parent_place_id,
289 _interpolated_housenumber(inner),
290 _interpolated_position(inner),
295 if self.has_geometries():
296 sub = _base_query().subquery('geom')
297 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
299 sql = sa.lambda_stmt(_base_query)
301 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
303 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
304 """ Find a street or POI/address for the given WKT point.
306 log().section('Reverse lookup on street/address level')
308 parent_place_id = None
310 row = await self._find_closest_street_or_poi(distance)
311 row_func: RowFunc = nres.create_from_placex_row
312 log().var_dump('Result (street/building)', row)
314 # If the closest result was a street, but an address was requested,
315 # check for a housenumber nearby which is part of the street.
317 if self.max_rank > 27 \
318 and self.layer_enabled(DataLayer.ADDRESS) \
319 and row.rank_address <= 27:
321 parent_place_id = row.place_id
322 log().comment('Find housenumber for street')
323 addr_row = await self._find_housenumber_for_street(parent_place_id)
324 log().var_dump('Result (street housenumber)', addr_row)
326 if addr_row is not None:
328 row_func = nres.create_from_placex_row
329 distance = addr_row.distance
330 elif row.country_code == 'us' and parent_place_id is not None:
331 log().comment('Find TIGER housenumber for street')
332 addr_row = await self._find_tiger_number_for_street(parent_place_id)
333 log().var_dump('Result (street Tiger housenumber)', addr_row)
335 if addr_row is not None:
336 row_func = cast(RowFunc,
337 functools.partial(nres.create_from_tiger_row,
338 osm_type=row.osm_type,
342 distance = row.distance
344 # Check for an interpolation that is either closer than our result
345 # or belongs to a close street found.
346 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
347 log().comment('Find interpolation for street')
348 addr_row = await self._find_interpolation_for_street(parent_place_id,
350 log().var_dump('Result (street interpolation)', addr_row)
351 if addr_row is not None:
353 row_func = nres.create_from_osmline_row
357 async def _lookup_area_address(self) -> Optional[SaRow]:
358 """ Lookup large addressable areas for the given WKT point.
360 log().comment('Reverse lookup by larger address area features')
361 t = self.conn.t.placex
363 def _base_query() -> SaSelect:
364 # The inner SQL brings results in the right order, so that
365 # later only a minimum of results needs to be checked with ST_Contains.
366 inner = sa.select(t, sa.literal(0.0).label('distance'))\
367 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
368 .where(t.c.geometry.intersects(WKT_PARAM))\
369 .where(sa.func.PlacexGeometryReverseLookuppolygon())\
370 .order_by(sa.desc(t.c.rank_search))\
374 return _select_from_placex(inner, False)\
375 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
376 .order_by(sa.desc(inner.c.rank_search))\
379 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
380 if self.has_geometries():
381 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
383 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
384 log().var_dump('Result (area)', address_row)
386 if address_row is not None and address_row.rank_search < self.max_rank:
387 log().comment('Search for better matching place nodes inside the area')
389 address_rank = address_row.rank_search
390 address_id = address_row.place_id
392 def _place_inside_area_query() -> SaSelect:
394 sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
395 .where(t.c.rank_search > address_rank)\
396 .where(t.c.rank_search <= MAX_RANK_PARAM)\
397 .where(t.c.indexed_status == 0)\
398 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
399 .order_by(sa.desc(t.c.rank_search))\
403 touter = t.alias('outer')
404 return _select_from_placex(inner, False)\
405 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
406 .where(touter.c.place_id == address_id)\
407 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
408 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
411 if self.has_geometries():
412 sql = self._add_geometry_columns(_place_inside_area_query(),
413 sa.literal_column('places.geometry'))
415 sql = sa.lambda_stmt(_place_inside_area_query)
417 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
418 log().var_dump('Result (place node)', place_address_row)
420 if place_address_row is not None:
421 return place_address_row
425 async def _lookup_area_others(self) -> Optional[SaRow]:
426 t = self.conn.t.placex
428 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
429 .where(t.c.rank_address == 0)\
430 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
431 .where(t.c.name != None)\
432 .where(t.c.indexed_status == 0)\
433 .where(t.c.linked_place_id == None)\
434 .where(self._filter_by_layer(t))\
435 .where(t.c.geometry.intersects(sa.func.ST_Expand(WKT_PARAM, 0.007)))\
436 .order_by(sa.desc(t.c.rank_search))\
437 .order_by('distance')\
441 sql = _select_from_placex(inner, False)\
442 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
443 inner.c.geometry.ST_Contains(WKT_PARAM)))\
444 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
447 if self.has_geometries():
448 sql = self._add_geometry_columns(sql, inner.c.geometry)
450 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
451 log().var_dump('Result (non-address feature)', row)
455 async def lookup_area(self) -> Optional[SaRow]:
456 """ Lookup large areas for the current search.
458 log().section('Reverse lookup by larger area features')
460 if self.layer_enabled(DataLayer.ADDRESS):
461 address_row = await self._lookup_area_address()
465 if self.has_feature_layers():
466 other_row = await self._lookup_area_others()
470 return _get_closest(address_row, other_row)
472 async def lookup_country_codes(self) -> List[str]:
473 """ Lookup the country for the current search.
475 log().section('Reverse lookup by country code')
476 t = self.conn.t.country_grid
477 sql = sa.select(t.c.country_code).distinct()\
478 .where(t.c.geometry.ST_Contains(WKT_PARAM))
480 ccodes = [cast(str, r[0]) for r in await self.conn.execute(sql, self.bind_params)]
481 log().var_dump('Country codes', ccodes)
484 async def lookup_country(self, ccodes: List[str]) -> Optional[SaRow]:
485 """ Lookup the country for the current search.
488 ccodes = await self.lookup_country_codes()
493 t = self.conn.t.placex
494 if self.max_rank > 4:
495 log().comment('Search for place nodes in country')
497 def _base_query() -> SaSelect:
498 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
499 .where(t.c.rank_search > 4)\
500 .where(t.c.rank_search <= MAX_RANK_PARAM)\
501 .where(t.c.indexed_status == 0)\
502 .where(t.c.country_code.in_(ccodes))\
503 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
504 .order_by(sa.desc(t.c.rank_search))\
508 return _select_from_placex(inner, False)\
509 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
510 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
514 if self.has_geometries():
515 sql = self._add_geometry_columns(_base_query(),
516 sa.literal_column('area.geometry'))
518 sql = sa.lambda_stmt(_base_query)
520 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
521 log().var_dump('Result (addressable place node)', address_row)
525 if address_row is None:
526 # Still nothing, then return a country with the appropriate country code.
527 def _country_base_query() -> SaSelect:
528 return _select_from_placex(t)\
529 .where(t.c.country_code.in_(ccodes))\
530 .where(t.c.rank_address == 4)\
531 .where(t.c.rank_search == 4)\
532 .where(t.c.linked_place_id == None)\
533 .order_by('distance')\
536 if self.has_geometries():
537 sql = self._add_geometry_columns(_country_base_query(), t.c.geometry)
539 sql = sa.lambda_stmt(_country_base_query)
541 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
545 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
546 """ Look up a single coordinate. Returns the place information,
547 if a place was found near the coordinates or None otherwise.
549 log().function('reverse_lookup', coord=coord, params=self.params)
551 self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
553 row: Optional[SaRow] = None
554 row_func: RowFunc = nres.create_from_placex_row
556 if self.max_rank >= 26:
557 row, tmp_row_func = await self.lookup_street_poi()
559 row_func = tmp_row_func
562 if self.restrict_to_country_areas:
563 ccodes = await self.lookup_country_codes()
569 if self.max_rank > 4:
570 row = await self.lookup_area()
571 if row is None and self.layer_enabled(DataLayer.ADDRESS):
572 row = await self.lookup_country(ccodes)
574 result = row_func(row, nres.ReverseResult)
575 if result is not None:
576 assert row is not None
577 result.distance = row.distance
578 if hasattr(row, 'bbox'):
579 result.bbox = Bbox.from_wkb(row.bbox)
580 await nres.add_result_details(self.conn, [result], self.params)