1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any, cast, Union
13 import sqlalchemy as sa
15 from nominatim_core.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow,\
16 SaBind, SaLambdaSelect
17 from nominatim_core.db.sqlalchemy_types import Geometry
18 from .connection import SearchConnection
19 from . import results as nres
20 from .logging import log
21 from .types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
23 # In SQLAlchemy expression which compare with NULL need to be expressed with
25 # pylint: disable=singleton-comparison
27 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
29 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
30 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
32 def no_index(expr: SaColumn) -> SaColumn:
33 """ Wrap the given expression, so that the query planner will
34 refrain from using the expression for index lookup.
36 return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
39 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
40 """ Create a select statement with the columns relevant for reverse
44 distance = t.c.distance
45 centroid = t.c.centroid
47 distance = t.c.geometry.ST_Distance(WKT_PARAM)
48 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
49 else_=t.c.centroid).label('centroid')
52 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
54 t.c.address, t.c.extratags,
55 t.c.housenumber, t.c.postcode, t.c.country_code,
56 t.c.importance, t.c.wikipedia,
57 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
59 t.c.linked_place_id, t.c.admin_level,
60 distance.label('distance'),
61 t.c.geometry.ST_Expand(0).label('bbox'))
64 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
65 return sa.cast(table.c.startnumber
66 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
67 / table.c.step) * table.c.step,
68 sa.Integer).label('housenumber')
71 def _interpolated_position(table: SaFromClause) -> SaLabel:
72 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
73 rounded_pos = sa.func.round(table.c.position / fac) * fac
75 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
76 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
79 def _locate_interpolation(table: SaFromClause) -> SaLabel:
80 """ Given a position, locate the closest point on the line.
82 return sa.case((table.c.linegeo.is_line_like(),
83 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
84 else_=0).label('position')
87 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
88 return min(rows, key=lambda row: 1000 if row is None else row.distance)
91 class ReverseGeocoder:
92 """ Class implementing the logic for looking up a place from a
96 def __init__(self, conn: SearchConnection, params: ReverseDetails,
97 restrict_to_country_areas: bool = False) -> None:
100 self.restrict_to_country_areas = restrict_to_country_areas
102 self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
106 def max_rank(self) -> int:
107 """ Return the maximum configured rank.
109 return self.params.max_rank
112 def has_geometries(self) -> bool:
113 """ Check if any geometries are requested.
115 return bool(self.params.geometry_output)
118 def layer_enabled(self, *layer: DataLayer) -> bool:
119 """ Return true when any of the given layer types are requested.
121 return any(self.params.layers & l for l in layer)
124 def layer_disabled(self, *layer: DataLayer) -> bool:
125 """ Return true when none of the given layer types is requested.
127 return not any(self.params.layers & l for l in layer)
130 def has_feature_layers(self) -> bool:
131 """ Return true if any layer other than ADDRESS or POI is requested.
133 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
136 def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
139 if self.params.geometry_simplification > 0.0:
140 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
142 if self.params.geometry_output & GeometryFormat.GEOJSON:
143 out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
144 if self.params.geometry_output & GeometryFormat.TEXT:
145 out.append(sa.func.ST_AsText(col).label('geometry_text'))
146 if self.params.geometry_output & GeometryFormat.KML:
147 out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
148 if self.params.geometry_output & GeometryFormat.SVG:
149 out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
151 return sql.add_columns(*out)
154 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
155 if self.layer_enabled(DataLayer.MANMADE):
157 if self.layer_disabled(DataLayer.RAILWAY):
158 exclude.append('railway')
159 if self.layer_disabled(DataLayer.NATURAL):
160 exclude.extend(('natural', 'water', 'waterway'))
161 return table.c.class_.not_in(tuple(exclude))
164 if self.layer_enabled(DataLayer.RAILWAY):
165 include.append('railway')
166 if self.layer_enabled(DataLayer.NATURAL):
167 include.extend(('natural', 'water', 'waterway'))
168 return table.c.class_.in_(tuple(include))
171 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
172 """ Look up the closest rank 26+ place in the database, which
173 is closer than the given distance.
175 t = self.conn.t.placex
177 # PostgreSQL must not get the distance as a parameter because
178 # there is a danger it won't be able to properly estimate index use
179 # when used with prepared statements
180 diststr = sa.text(f"{distance}")
182 sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
183 .where(t.c.geometry.within_distance(WKT_PARAM, diststr))
184 .where(t.c.indexed_status == 0)
185 .where(t.c.linked_place_id == None)
186 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
187 t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
188 .order_by('distance')
191 if self.has_geometries():
192 sql = self._add_geometry_columns(sql, t.c.geometry)
194 restrict: List[Union[SaColumn, Callable[[], SaColumn]]] = []
196 if self.layer_enabled(DataLayer.ADDRESS):
197 max_rank = min(29, self.max_rank)
198 restrict.append(lambda: no_index(t.c.rank_address).between(26, max_rank))
199 if self.max_rank == 30:
200 restrict.append(lambda: sa.func.IsAddressPoint(t))
201 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
202 restrict.append(lambda: sa.and_(no_index(t.c.rank_search) == 30,
203 t.c.class_.not_in(('place', 'building')),
204 sa.not_(t.c.geometry.is_line_like())))
205 if self.has_feature_layers():
206 restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
207 no_index(t.c.rank_address) == 0,
208 self._filter_by_layer(t)))
213 sql = sql.where(sa.or_(*restrict))
215 # If the closest object is inside an area, then check if there is a
216 # POI node nearby and return that.
218 for row in await self.conn.execute(sql, self.bind_params):
220 if row.rank_search <= 27 or row.osm_type == 'N' or row.distance > 0:
224 if row.rank_search > 27 and row.osm_type == 'N'\
225 and row.distance < 0.0001:
231 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
232 t = self.conn.t.placex
234 def _base_query() -> SaSelect:
235 return _select_from_placex(t)\
236 .where(t.c.geometry.within_distance(WKT_PARAM, 0.001))\
237 .where(t.c.parent_place_id == parent_place_id)\
238 .where(sa.func.IsAddressPoint(t))\
239 .where(t.c.indexed_status == 0)\
240 .where(t.c.linked_place_id == None)\
241 .order_by('distance')\
245 if self.has_geometries():
246 sql = self._add_geometry_columns(_base_query(), t.c.geometry)
248 sql = sa.lambda_stmt(_base_query)
250 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
253 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
254 distance: float) -> Optional[SaRow]:
255 t = self.conn.t.osmline
258 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
259 _locate_interpolation(t))\
260 .where(t.c.linegeo.within_distance(WKT_PARAM, distance))\
261 .where(t.c.startnumber != None)\
262 .order_by('distance')\
265 if parent_place_id is not None:
266 sql = sql.where(t.c.parent_place_id == parent_place_id)
268 inner = sql.subquery('ipol')
270 sql = sa.select(inner.c.place_id, inner.c.osm_id,
271 inner.c.parent_place_id, inner.c.address,
272 _interpolated_housenumber(inner),
273 _interpolated_position(inner),
274 inner.c.postcode, inner.c.country_code,
277 if self.has_geometries():
278 sub = sql.subquery('geom')
279 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
281 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
284 async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
285 t = self.conn.t.tiger
287 def _base_query() -> SaSelect:
289 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
290 _locate_interpolation(t))\
291 .where(t.c.linegeo.within_distance(WKT_PARAM, 0.001))\
292 .where(t.c.parent_place_id == parent_place_id)\
293 .order_by('distance')\
297 return sa.select(inner.c.place_id,
298 inner.c.parent_place_id,
299 _interpolated_housenumber(inner),
300 _interpolated_position(inner),
305 if self.has_geometries():
306 sub = _base_query().subquery('geom')
307 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
309 sql = sa.lambda_stmt(_base_query)
311 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
314 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
315 """ Find a street or POI/address for the given WKT point.
317 log().section('Reverse lookup on street/address level')
319 parent_place_id = None
321 row = await self._find_closest_street_or_poi(distance)
322 row_func: RowFunc = nres.create_from_placex_row
323 log().var_dump('Result (street/building)', row)
325 # If the closest result was a street, but an address was requested,
326 # check for a housenumber nearby which is part of the street.
328 if self.max_rank > 27 \
329 and self.layer_enabled(DataLayer.ADDRESS) \
330 and row.rank_address <= 27:
332 parent_place_id = row.place_id
333 log().comment('Find housenumber for street')
334 addr_row = await self._find_housenumber_for_street(parent_place_id)
335 log().var_dump('Result (street housenumber)', addr_row)
337 if addr_row is not None:
339 row_func = nres.create_from_placex_row
340 distance = addr_row.distance
341 elif row.country_code == 'us' and parent_place_id is not None:
342 log().comment('Find TIGER housenumber for street')
343 addr_row = await self._find_tiger_number_for_street(parent_place_id)
344 log().var_dump('Result (street Tiger housenumber)', addr_row)
346 if addr_row is not None:
347 row_func = cast(RowFunc,
348 functools.partial(nres.create_from_tiger_row,
349 osm_type=row.osm_type,
353 distance = row.distance
355 # Check for an interpolation that is either closer than our result
356 # or belongs to a close street found.
357 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
358 log().comment('Find interpolation for street')
359 addr_row = await self._find_interpolation_for_street(parent_place_id,
361 log().var_dump('Result (street interpolation)', addr_row)
362 if addr_row is not None:
364 row_func = nres.create_from_osmline_row
369 async def _lookup_area_address(self) -> Optional[SaRow]:
370 """ Lookup large addressable areas for the given WKT point.
372 log().comment('Reverse lookup by larger address area features')
373 t = self.conn.t.placex
375 def _base_query() -> SaSelect:
376 # The inner SQL brings results in the right order, so that
377 # later only a minimum of results needs to be checked with ST_Contains.
378 inner = sa.select(t, sa.literal(0.0).label('distance'))\
379 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
380 .where(t.c.geometry.intersects(WKT_PARAM))\
381 .where(sa.func.PlacexGeometryReverseLookuppolygon())\
382 .order_by(sa.desc(t.c.rank_search))\
386 return _select_from_placex(inner, False)\
387 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
388 .order_by(sa.desc(inner.c.rank_search))\
391 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
392 if self.has_geometries():
393 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
395 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
396 log().var_dump('Result (area)', address_row)
398 if address_row is not None and address_row.rank_search < self.max_rank:
399 log().comment('Search for better matching place nodes inside the area')
401 address_rank = address_row.rank_search
402 address_id = address_row.place_id
404 def _place_inside_area_query() -> SaSelect:
407 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
408 .where(t.c.rank_search > address_rank)\
409 .where(t.c.rank_search <= MAX_RANK_PARAM)\
410 .where(t.c.indexed_status == 0)\
411 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
412 .order_by(sa.desc(t.c.rank_search))\
416 touter = t.alias('outer')
417 return _select_from_placex(inner, False)\
418 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
419 .where(touter.c.place_id == address_id)\
420 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
421 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
424 if self.has_geometries():
425 sql = self._add_geometry_columns(_place_inside_area_query(),
426 sa.literal_column('places.geometry'))
428 sql = sa.lambda_stmt(_place_inside_area_query)
430 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
431 log().var_dump('Result (place node)', place_address_row)
433 if place_address_row is not None:
434 return place_address_row
439 async def _lookup_area_others(self) -> Optional[SaRow]:
440 t = self.conn.t.placex
442 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
443 .where(t.c.rank_address == 0)\
444 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
445 .where(t.c.name != None)\
446 .where(t.c.indexed_status == 0)\
447 .where(t.c.linked_place_id == None)\
448 .where(self._filter_by_layer(t))\
449 .where(t.c.geometry.intersects(sa.func.ST_Expand(WKT_PARAM, 0.007)))\
450 .order_by(sa.desc(t.c.rank_search))\
451 .order_by('distance')\
455 sql = _select_from_placex(inner, False)\
456 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
457 inner.c.geometry.ST_Contains(WKT_PARAM)))\
458 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
461 if self.has_geometries():
462 sql = self._add_geometry_columns(sql, inner.c.geometry)
464 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
465 log().var_dump('Result (non-address feature)', row)
470 async def lookup_area(self) -> Optional[SaRow]:
471 """ Lookup large areas for the current search.
473 log().section('Reverse lookup by larger area features')
475 if self.layer_enabled(DataLayer.ADDRESS):
476 address_row = await self._lookup_area_address()
480 if self.has_feature_layers():
481 other_row = await self._lookup_area_others()
485 return _get_closest(address_row, other_row)
488 async def lookup_country_codes(self) -> List[str]:
489 """ Lookup the country for the current search.
491 log().section('Reverse lookup by country code')
492 t = self.conn.t.country_grid
493 sql = sa.select(t.c.country_code).distinct()\
494 .where(t.c.geometry.ST_Contains(WKT_PARAM))
496 ccodes = [cast(str, r[0]) for r in await self.conn.execute(sql, self.bind_params)]
497 log().var_dump('Country codes', ccodes)
501 async def lookup_country(self, ccodes: List[str]) -> Optional[SaRow]:
502 """ Lookup the country for the current search.
505 ccodes = await self.lookup_country_codes()
510 t = self.conn.t.placex
511 if self.max_rank > 4:
512 log().comment('Search for place nodes in country')
514 def _base_query() -> SaSelect:
517 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
518 .where(t.c.rank_search > 4)\
519 .where(t.c.rank_search <= MAX_RANK_PARAM)\
520 .where(t.c.indexed_status == 0)\
521 .where(t.c.country_code.in_(ccodes))\
522 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
523 .order_by(sa.desc(t.c.rank_search))\
527 return _select_from_placex(inner, False)\
528 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
529 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
533 if self.has_geometries():
534 sql = self._add_geometry_columns(_base_query(),
535 sa.literal_column('area.geometry'))
537 sql = sa.lambda_stmt(_base_query)
539 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
540 log().var_dump('Result (addressable place node)', address_row)
544 if address_row is None:
545 # Still nothing, then return a country with the appropriate country code.
546 def _country_base_query() -> SaSelect:
547 return _select_from_placex(t)\
548 .where(t.c.country_code.in_(ccodes))\
549 .where(t.c.rank_address == 4)\
550 .where(t.c.rank_search == 4)\
551 .where(t.c.linked_place_id == None)\
552 .order_by('distance')\
555 if self.has_geometries():
556 sql = self._add_geometry_columns(_country_base_query(), t.c.geometry)
558 sql = sa.lambda_stmt(_country_base_query)
560 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
565 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
566 """ Look up a single coordinate. Returns the place information,
567 if a place was found near the coordinates or None otherwise.
569 log().function('reverse_lookup', coord=coord, params=self.params)
572 self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
574 row: Optional[SaRow] = None
575 row_func: RowFunc = nres.create_from_placex_row
577 if self.max_rank >= 26:
578 row, tmp_row_func = await self.lookup_street_poi()
580 row_func = tmp_row_func
583 if self.restrict_to_country_areas:
584 ccodes = await self.lookup_country_codes()
590 if self.max_rank > 4:
591 row = await self.lookup_area()
592 if row is None and self.layer_enabled(DataLayer.ADDRESS):
593 row = await self.lookup_country(ccodes)
595 result = row_func(row, nres.ReverseResult)
596 if result is not None:
597 assert row is not None
598 result.distance = row.distance
599 if hasattr(row, 'bbox'):
600 result.bbox = Bbox.from_wkb(row.bbox)
601 await nres.add_result_details(self.conn, [result], self.params)