1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any, cast, Union
13 import sqlalchemy as sa
15 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow,\
16 SaBind, SaLambdaSelect
17 from nominatim.api.connection import SearchConnection
18 import nominatim.api.results as nres
19 from nominatim.api.logging import log
20 from nominatim.api.types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
21 from nominatim.db.sqlalchemy_types import Geometry
23 # In SQLAlchemy expression which compare with NULL need to be expressed with
25 # pylint: disable=singleton-comparison
27 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
29 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
30 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
32 def no_index(expr: SaColumn) -> SaColumn:
33 """ Wrap the given expression, so that the query planner will
34 refrain from using the expression for index lookup.
36 return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
39 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
40 """ Create a select statement with the columns relevant for reverse
44 distance = t.c.distance
45 centroid = t.c.centroid
47 distance = t.c.geometry.ST_Distance(WKT_PARAM)
48 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
49 else_=t.c.centroid).label('centroid')
52 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
54 t.c.address, t.c.extratags,
55 t.c.housenumber, t.c.postcode, t.c.country_code,
56 t.c.importance, t.c.wikipedia,
57 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
59 t.c.linked_place_id, t.c.admin_level,
60 distance.label('distance'),
61 t.c.geometry.ST_Expand(0).label('bbox'))
64 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
65 return sa.cast(table.c.startnumber
66 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
67 / table.c.step) * table.c.step,
68 sa.Integer).label('housenumber')
71 def _interpolated_position(table: SaFromClause) -> SaLabel:
72 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
73 rounded_pos = sa.func.round(table.c.position / fac) * fac
75 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
76 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
79 def _locate_interpolation(table: SaFromClause) -> SaLabel:
80 """ Given a position, locate the closest point on the line.
82 return sa.case((table.c.linegeo.is_line_like(),
83 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
84 else_=0).label('position')
87 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
88 return min(rows, key=lambda row: 1000 if row is None else row.distance)
91 class ReverseGeocoder:
92 """ Class implementing the logic for looking up a place from a
96 def __init__(self, conn: SearchConnection, params: ReverseDetails,
97 restrict_to_country_areas: bool = False) -> None:
100 self.restrict_to_country_areas = restrict_to_country_areas
102 self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
106 def max_rank(self) -> int:
107 """ Return the maximum configured rank.
109 return self.params.max_rank
112 def has_geometries(self) -> bool:
113 """ Check if any geometries are requested.
115 return bool(self.params.geometry_output)
118 def layer_enabled(self, *layer: DataLayer) -> bool:
119 """ Return true when any of the given layer types are requested.
121 return any(self.params.layers & l for l in layer)
124 def layer_disabled(self, *layer: DataLayer) -> bool:
125 """ Return true when none of the given layer types is requested.
127 return not any(self.params.layers & l for l in layer)
130 def has_feature_layers(self) -> bool:
131 """ Return true if any layer other than ADDRESS or POI is requested.
133 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
136 def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
139 if self.params.geometry_simplification > 0.0:
140 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
142 if self.params.geometry_output & GeometryFormat.GEOJSON:
143 out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
144 if self.params.geometry_output & GeometryFormat.TEXT:
145 out.append(sa.func.ST_AsText(col).label('geometry_text'))
146 if self.params.geometry_output & GeometryFormat.KML:
147 out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
148 if self.params.geometry_output & GeometryFormat.SVG:
149 out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
151 return sql.add_columns(*out)
154 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
155 if self.layer_enabled(DataLayer.MANMADE):
157 if self.layer_disabled(DataLayer.RAILWAY):
158 exclude.append('railway')
159 if self.layer_disabled(DataLayer.NATURAL):
160 exclude.extend(('natural', 'water', 'waterway'))
161 return table.c.class_.not_in(tuple(exclude))
164 if self.layer_enabled(DataLayer.RAILWAY):
165 include.append('railway')
166 if self.layer_enabled(DataLayer.NATURAL):
167 include.extend(('natural', 'water', 'waterway'))
168 return table.c.class_.in_(tuple(include))
171 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
172 """ Look up the closest rank 26+ place in the database, which
173 is closer than the given distance.
175 t = self.conn.t.placex
177 # PostgreSQL must not get the distance as a parameter because
178 # there is a danger it won't be able to properly estimate index use
179 # when used with prepared statements
180 diststr = sa.text(f"{distance}")
182 sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
183 .where(t.c.geometry.within_distance(WKT_PARAM, diststr))
184 .where(t.c.indexed_status == 0)
185 .where(t.c.linked_place_id == None)
186 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
187 t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
188 .order_by('distance')
191 if self.has_geometries():
192 sql = self._add_geometry_columns(sql, t.c.geometry)
194 restrict: List[Union[SaColumn, Callable[[], SaColumn]]] = []
196 if self.layer_enabled(DataLayer.ADDRESS):
197 max_rank = min(29, self.max_rank)
198 restrict.append(lambda: no_index(t.c.rank_address).between(26, max_rank))
199 if self.max_rank == 30:
200 restrict.append(lambda: sa.func.IsAddressPoint(t))
201 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
202 restrict.append(lambda: sa.and_(no_index(t.c.rank_search) == 30,
203 t.c.class_.not_in(('place', 'building')),
204 sa.not_(t.c.geometry.is_line_like())))
205 if self.has_feature_layers():
206 restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
207 no_index(t.c.rank_address) == 0,
208 self._filter_by_layer(t)))
213 sql = sql.where(sa.or_(*restrict))
215 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
218 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
219 t = self.conn.t.placex
221 def _base_query() -> SaSelect:
222 return _select_from_placex(t)\
223 .where(t.c.geometry.within_distance(WKT_PARAM, 0.001))\
224 .where(t.c.parent_place_id == parent_place_id)\
225 .where(sa.func.IsAddressPoint(t))\
226 .where(t.c.indexed_status == 0)\
227 .where(t.c.linked_place_id == None)\
228 .order_by('distance')\
232 if self.has_geometries():
233 sql = self._add_geometry_columns(_base_query(), t.c.geometry)
235 sql = sa.lambda_stmt(_base_query)
237 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
240 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
241 distance: float) -> Optional[SaRow]:
242 t = self.conn.t.osmline
245 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
246 _locate_interpolation(t))\
247 .where(t.c.linegeo.within_distance(WKT_PARAM, distance))\
248 .where(t.c.startnumber != None)\
249 .order_by('distance')\
252 if parent_place_id is not None:
253 sql = sql.where(t.c.parent_place_id == parent_place_id)
255 inner = sql.subquery('ipol')
257 sql = sa.select(inner.c.place_id, inner.c.osm_id,
258 inner.c.parent_place_id, inner.c.address,
259 _interpolated_housenumber(inner),
260 _interpolated_position(inner),
261 inner.c.postcode, inner.c.country_code,
264 if self.has_geometries():
265 sub = sql.subquery('geom')
266 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
268 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
271 async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
272 t = self.conn.t.tiger
274 def _base_query() -> SaSelect:
276 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
277 _locate_interpolation(t))\
278 .where(t.c.linegeo.within_distance(WKT_PARAM, 0.001))\
279 .where(t.c.parent_place_id == parent_place_id)\
280 .order_by('distance')\
284 return sa.select(inner.c.place_id,
285 inner.c.parent_place_id,
286 _interpolated_housenumber(inner),
287 _interpolated_position(inner),
292 if self.has_geometries():
293 sub = _base_query().subquery('geom')
294 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
296 sql = sa.lambda_stmt(_base_query)
298 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
301 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
302 """ Find a street or POI/address for the given WKT point.
304 log().section('Reverse lookup on street/address level')
306 parent_place_id = None
308 row = await self._find_closest_street_or_poi(distance)
309 row_func: RowFunc = nres.create_from_placex_row
310 log().var_dump('Result (street/building)', row)
312 # If the closest result was a street, but an address was requested,
313 # check for a housenumber nearby which is part of the street.
315 if self.max_rank > 27 \
316 and self.layer_enabled(DataLayer.ADDRESS) \
317 and row.rank_address <= 27:
319 parent_place_id = row.place_id
320 log().comment('Find housenumber for street')
321 addr_row = await self._find_housenumber_for_street(parent_place_id)
322 log().var_dump('Result (street housenumber)', addr_row)
324 if addr_row is not None:
326 row_func = nres.create_from_placex_row
327 distance = addr_row.distance
328 elif row.country_code == 'us' and parent_place_id is not None:
329 log().comment('Find TIGER housenumber for street')
330 addr_row = await self._find_tiger_number_for_street(parent_place_id)
331 log().var_dump('Result (street Tiger housenumber)', addr_row)
333 if addr_row is not None:
334 row_func = cast(RowFunc,
335 functools.partial(nres.create_from_tiger_row,
336 osm_type=row.osm_type,
340 distance = row.distance
342 # Check for an interpolation that is either closer than our result
343 # or belongs to a close street found.
344 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
345 log().comment('Find interpolation for street')
346 addr_row = await self._find_interpolation_for_street(parent_place_id,
348 log().var_dump('Result (street interpolation)', addr_row)
349 if addr_row is not None:
351 row_func = nres.create_from_osmline_row
356 async def _lookup_area_address(self) -> Optional[SaRow]:
357 """ Lookup large addressable areas for the given WKT point.
359 log().comment('Reverse lookup by larger address area features')
360 t = self.conn.t.placex
362 def _base_query() -> SaSelect:
363 # The inner SQL brings results in the right order, so that
364 # later only a minimum of results needs to be checked with ST_Contains.
365 inner = sa.select(t, sa.literal(0.0).label('distance'))\
366 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
367 .where(t.c.geometry.intersects(WKT_PARAM))\
368 .where(sa.func.PlacexGeometryReverseLookuppolygon())\
369 .order_by(sa.desc(t.c.rank_search))\
373 return _select_from_placex(inner, False)\
374 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
375 .order_by(sa.desc(inner.c.rank_search))\
378 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
379 if self.has_geometries():
380 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
382 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
383 log().var_dump('Result (area)', address_row)
385 if address_row is not None and address_row.rank_search < self.max_rank:
386 log().comment('Search for better matching place nodes inside the area')
388 address_rank = address_row.rank_search
389 address_id = address_row.place_id
391 def _place_inside_area_query() -> SaSelect:
394 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
395 .where(t.c.rank_search > address_rank)\
396 .where(t.c.rank_search <= MAX_RANK_PARAM)\
397 .where(t.c.indexed_status == 0)\
398 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
399 .order_by(sa.desc(t.c.rank_search))\
403 touter = t.alias('outer')
404 return _select_from_placex(inner, False)\
405 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
406 .where(touter.c.place_id == address_id)\
407 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
408 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
411 if self.has_geometries():
412 sql = self._add_geometry_columns(_place_inside_area_query(),
413 sa.literal_column('places.geometry'))
415 sql = sa.lambda_stmt(_place_inside_area_query)
417 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
418 log().var_dump('Result (place node)', place_address_row)
420 if place_address_row is not None:
421 return place_address_row
426 async def _lookup_area_others(self) -> Optional[SaRow]:
427 t = self.conn.t.placex
429 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
430 .where(t.c.rank_address == 0)\
431 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
432 .where(t.c.name != None)\
433 .where(t.c.indexed_status == 0)\
434 .where(t.c.linked_place_id == None)\
435 .where(self._filter_by_layer(t))\
436 .where(t.c.geometry.intersects(sa.func.ST_Expand(WKT_PARAM, 0.007)))\
437 .order_by(sa.desc(t.c.rank_search))\
438 .order_by('distance')\
442 sql = _select_from_placex(inner, False)\
443 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
444 inner.c.geometry.ST_Contains(WKT_PARAM)))\
445 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
448 if self.has_geometries():
449 sql = self._add_geometry_columns(sql, inner.c.geometry)
451 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
452 log().var_dump('Result (non-address feature)', row)
457 async def lookup_area(self) -> Optional[SaRow]:
458 """ Lookup large areas for the current search.
460 log().section('Reverse lookup by larger area features')
462 if self.layer_enabled(DataLayer.ADDRESS):
463 address_row = await self._lookup_area_address()
467 if self.has_feature_layers():
468 other_row = await self._lookup_area_others()
472 return _get_closest(address_row, other_row)
475 async def lookup_country_codes(self) -> List[str]:
476 """ Lookup the country for the current search.
478 log().section('Reverse lookup by country code')
479 t = self.conn.t.country_grid
480 sql = sa.select(t.c.country_code).distinct()\
481 .where(t.c.geometry.ST_Contains(WKT_PARAM))
483 ccodes = [cast(str, r[0]) for r in await self.conn.execute(sql, self.bind_params)]
484 log().var_dump('Country codes', ccodes)
488 async def lookup_country(self, ccodes: List[str]) -> Optional[SaRow]:
489 """ Lookup the country for the current search.
492 ccodes = await self.lookup_country_codes()
497 t = self.conn.t.placex
498 if self.max_rank > 4:
499 log().comment('Search for place nodes in country')
501 def _base_query() -> SaSelect:
504 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
505 .where(t.c.rank_search > 4)\
506 .where(t.c.rank_search <= MAX_RANK_PARAM)\
507 .where(t.c.indexed_status == 0)\
508 .where(t.c.country_code.in_(ccodes))\
509 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
510 .order_by(sa.desc(t.c.rank_search))\
514 return _select_from_placex(inner, False)\
515 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
516 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
520 if self.has_geometries():
521 sql = self._add_geometry_columns(_base_query(),
522 sa.literal_column('area.geometry'))
524 sql = sa.lambda_stmt(_base_query)
526 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
527 log().var_dump('Result (addressable place node)', address_row)
531 if address_row is None:
532 # Still nothing, then return a country with the appropriate country code.
533 def _country_base_query() -> SaSelect:
534 return _select_from_placex(t)\
535 .where(t.c.country_code.in_(ccodes))\
536 .where(t.c.rank_address == 4)\
537 .where(t.c.rank_search == 4)\
538 .where(t.c.linked_place_id == None)\
539 .order_by('distance')\
542 if self.has_geometries():
543 sql = self._add_geometry_columns(_country_base_query(), t.c.geometry)
545 sql = sa.lambda_stmt(_country_base_query)
547 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
552 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
553 """ Look up a single coordinate. Returns the place information,
554 if a place was found near the coordinates or None otherwise.
556 log().function('reverse_lookup', coord=coord, params=self.params)
559 self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
561 row: Optional[SaRow] = None
562 row_func: RowFunc = nres.create_from_placex_row
564 if self.max_rank >= 26:
565 row, tmp_row_func = await self.lookup_street_poi()
567 row_func = tmp_row_func
570 if self.restrict_to_country_areas:
571 ccodes = await self.lookup_country_codes()
577 if self.max_rank > 4:
578 row = await self.lookup_area()
579 if row is None and self.layer_enabled(DataLayer.ADDRESS):
580 row = await self.lookup_country(ccodes)
582 result = row_func(row, nres.ReverseResult)
583 if result is not None:
584 assert row is not None
585 result.distance = row.distance
586 if hasattr(row, 'bbox'):
587 result.bbox = Bbox.from_wkb(row.bbox)
588 await nres.add_result_details(self.conn, [result], self.params)