1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any, cast, Union
13 import sqlalchemy as sa
15 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow,\
16 SaBind, SaLambdaSelect
17 from nominatim.api.connection import SearchConnection
18 import nominatim.api.results as nres
19 from nominatim.api.logging import log
20 from nominatim.api.types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
21 from nominatim.db.sqlalchemy_types import Geometry
23 # In SQLAlchemy expression which compare with NULL need to be expressed with
25 # pylint: disable=singleton-comparison
27 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
29 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
30 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
32 def no_index(expr: SaColumn) -> SaColumn:
33 """ Wrap the given expression, so that the query planner will
34 refrain from using the expression for index lookup.
36 return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
39 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
40 """ Create a select statement with the columns relevant for reverse
44 distance = t.c.distance
45 centroid = t.c.centroid
47 distance = t.c.geometry.ST_Distance(WKT_PARAM)
48 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
49 else_=t.c.centroid).label('centroid')
52 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
54 t.c.address, t.c.extratags,
55 t.c.housenumber, t.c.postcode, t.c.country_code,
56 t.c.importance, t.c.wikipedia,
57 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
59 t.c.linked_place_id, t.c.admin_level,
60 distance.label('distance'),
61 t.c.geometry.ST_Expand(0).label('bbox'))
64 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
65 return sa.cast(table.c.startnumber
66 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
67 / table.c.step) * table.c.step,
68 sa.Integer).label('housenumber')
71 def _interpolated_position(table: SaFromClause) -> SaLabel:
72 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
73 rounded_pos = sa.func.round(table.c.position / fac) * fac
75 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
76 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
79 def _locate_interpolation(table: SaFromClause) -> SaLabel:
80 """ Given a position, locate the closest point on the line.
82 return sa.case((table.c.linegeo.is_line_like(),
83 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
84 else_=0).label('position')
87 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
88 return min(rows, key=lambda row: 1000 if row is None else row.distance)
91 class ReverseGeocoder:
92 """ Class implementing the logic for looking up a place from a
96 def __init__(self, conn: SearchConnection, params: ReverseDetails,
97 restrict_to_country_areas: bool = False) -> None:
100 self.restrict_to_country_areas = restrict_to_country_areas
102 self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
106 def max_rank(self) -> int:
107 """ Return the maximum configured rank.
109 return self.params.max_rank
112 def has_geometries(self) -> bool:
113 """ Check if any geometries are requested.
115 return bool(self.params.geometry_output)
118 def layer_enabled(self, *layer: DataLayer) -> bool:
119 """ Return true when any of the given layer types are requested.
121 return any(self.params.layers & l for l in layer)
124 def layer_disabled(self, *layer: DataLayer) -> bool:
125 """ Return true when none of the given layer types is requested.
127 return not any(self.params.layers & l for l in layer)
130 def has_feature_layers(self) -> bool:
131 """ Return true if any layer other than ADDRESS or POI is requested.
133 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
136 def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
139 if self.params.geometry_simplification > 0.0:
140 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
142 if self.params.geometry_output & GeometryFormat.GEOJSON:
143 out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
144 if self.params.geometry_output & GeometryFormat.TEXT:
145 out.append(sa.func.ST_AsText(col).label('geometry_text'))
146 if self.params.geometry_output & GeometryFormat.KML:
147 out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
148 if self.params.geometry_output & GeometryFormat.SVG:
149 out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
151 return sql.add_columns(*out)
154 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
155 if self.layer_enabled(DataLayer.MANMADE):
157 if self.layer_disabled(DataLayer.RAILWAY):
158 exclude.append('railway')
159 if self.layer_disabled(DataLayer.NATURAL):
160 exclude.extend(('natural', 'water', 'waterway'))
161 return table.c.class_.not_in(tuple(exclude))
164 if self.layer_enabled(DataLayer.RAILWAY):
165 include.append('railway')
166 if self.layer_enabled(DataLayer.NATURAL):
167 include.extend(('natural', 'water', 'waterway'))
168 return table.c.class_.in_(tuple(include))
171 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
172 """ Look up the closest rank 26+ place in the database, which
173 is closer than the given distance.
175 t = self.conn.t.placex
177 # PostgreSQL must not get the distance as a parameter because
178 # there is a danger it won't be able to proberly estimate index use
179 # when used with prepared statements
180 diststr = sa.text(f"{distance}")
182 sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
183 .where(t.c.geometry.ST_DWithin(WKT_PARAM, diststr))
184 .where(t.c.indexed_status == 0)
185 .where(t.c.linked_place_id == None)
186 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
187 t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
188 .order_by('distance')
191 if self.has_geometries():
192 sql = self._add_geometry_columns(sql, t.c.geometry)
194 restrict: List[Union[SaColumn, Callable[[], SaColumn]]] = []
196 if self.layer_enabled(DataLayer.ADDRESS):
197 max_rank = min(29, self.max_rank)
198 restrict.append(lambda: no_index(t.c.rank_address).between(26, max_rank))
199 if self.max_rank == 30:
200 restrict.append(lambda: sa.func.IsAddressPoint(t))
201 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
202 restrict.append(lambda: sa.and_(no_index(t.c.rank_search) == 30,
203 t.c.class_.not_in(('place', 'building')),
204 sa.not_(t.c.geometry.is_line_like())))
205 if self.has_feature_layers():
206 restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
207 no_index(t.c.rank_address) == 0,
208 self._filter_by_layer(t)))
213 sql = sql.where(sa.or_(*restrict))
215 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
218 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
219 t = self.conn.t.placex
221 sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
222 .where(t.c.geometry.ST_DWithin(WKT_PARAM, 0.001))
223 .where(t.c.parent_place_id == parent_place_id)
224 .where(sa.func.IsAddressPoint(t))
225 .where(t.c.indexed_status == 0)
226 .where(t.c.linked_place_id == None)
227 .order_by('distance')
230 if self.has_geometries():
231 sql = self._add_geometry_columns(sql, t.c.geometry)
233 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
236 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
237 distance: float) -> Optional[SaRow]:
238 t = self.conn.t.osmline
240 sql: Any = sa.lambda_stmt(lambda:
242 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
243 _locate_interpolation(t))
244 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, distance))
245 .where(t.c.startnumber != None)
246 .order_by('distance')
249 if parent_place_id is not None:
250 sql += lambda s: s.where(t.c.parent_place_id == parent_place_id)
252 def _wrap_query(base_sql: SaLambdaSelect) -> SaSelect:
253 inner = base_sql.subquery('ipol')
255 return sa.select(inner.c.place_id, inner.c.osm_id,
256 inner.c.parent_place_id, inner.c.address,
257 _interpolated_housenumber(inner),
258 _interpolated_position(inner),
259 inner.c.postcode, inner.c.country_code,
264 if self.has_geometries():
265 sub = sql.subquery('geom')
266 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
268 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
271 async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
272 t = self.conn.t.tiger
274 def _base_query() -> SaSelect:
276 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
277 _locate_interpolation(t))\
278 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, 0.001))\
279 .where(t.c.parent_place_id == parent_place_id)\
280 .order_by('distance')\
284 return sa.select(inner.c.place_id,
285 inner.c.parent_place_id,
286 _interpolated_housenumber(inner),
287 _interpolated_position(inner),
291 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
293 if self.has_geometries():
294 sub = sql.subquery('geom')
295 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
297 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
300 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
301 """ Find a street or POI/address for the given WKT point.
303 log().section('Reverse lookup on street/address level')
305 parent_place_id = None
307 row = await self._find_closest_street_or_poi(distance)
308 row_func: RowFunc = nres.create_from_placex_row
309 log().var_dump('Result (street/building)', row)
311 # If the closest result was a street, but an address was requested,
312 # check for a housenumber nearby which is part of the street.
314 if self.max_rank > 27 \
315 and self.layer_enabled(DataLayer.ADDRESS) \
316 and row.rank_address <= 27:
318 parent_place_id = row.place_id
319 log().comment('Find housenumber for street')
320 addr_row = await self._find_housenumber_for_street(parent_place_id)
321 log().var_dump('Result (street housenumber)', addr_row)
323 if addr_row is not None:
325 row_func = nres.create_from_placex_row
326 distance = addr_row.distance
327 elif row.country_code == 'us' and parent_place_id is not None:
328 log().comment('Find TIGER housenumber for street')
329 addr_row = await self._find_tiger_number_for_street(parent_place_id)
330 log().var_dump('Result (street Tiger housenumber)', addr_row)
332 if addr_row is not None:
333 row_func = cast(RowFunc,
334 functools.partial(nres.create_from_tiger_row,
335 osm_type=row.osm_type,
339 distance = row.distance
341 # Check for an interpolation that is either closer than our result
342 # or belongs to a close street found.
343 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
344 log().comment('Find interpolation for street')
345 addr_row = await self._find_interpolation_for_street(parent_place_id,
347 log().var_dump('Result (street interpolation)', addr_row)
348 if addr_row is not None:
350 row_func = nres.create_from_osmline_row
355 async def _lookup_area_address(self) -> Optional[SaRow]:
356 """ Lookup large addressable areas for the given WKT point.
358 log().comment('Reverse lookup by larger address area features')
359 t = self.conn.t.placex
361 def _base_query() -> SaSelect:
362 # The inner SQL brings results in the right order, so that
363 # later only a minimum of results needs to be checked with ST_Contains.
364 inner = sa.select(t, sa.literal(0.0).label('distance'))\
365 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
366 .where(t.c.geometry.intersects(WKT_PARAM))\
367 .where(sa.func.PlacexGeometryReverseLookuppolygon())\
368 .order_by(sa.desc(t.c.rank_search))\
372 return _select_from_placex(inner, False)\
373 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
374 .order_by(sa.desc(inner.c.rank_search))\
377 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
378 if self.has_geometries():
379 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
381 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
382 log().var_dump('Result (area)', address_row)
384 if address_row is not None and address_row.rank_search < self.max_rank:
385 log().comment('Search for better matching place nodes inside the area')
387 address_rank = address_row.rank_search
388 address_id = address_row.place_id
390 def _place_inside_area_query() -> SaSelect:
393 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
394 .where(t.c.rank_search > address_rank)\
395 .where(t.c.rank_search <= MAX_RANK_PARAM)\
396 .where(t.c.indexed_status == 0)\
397 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
398 .order_by(sa.desc(t.c.rank_search))\
402 touter = t.alias('outer')
403 return _select_from_placex(inner, False)\
404 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
405 .where(touter.c.place_id == address_id)\
406 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
407 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
410 sql = sa.lambda_stmt(_place_inside_area_query)
411 if self.has_geometries():
412 sql = self._add_geometry_columns(sql, sa.literal_column('places.geometry'))
414 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
415 log().var_dump('Result (place node)', place_address_row)
417 if place_address_row is not None:
418 return place_address_row
423 async def _lookup_area_others(self) -> Optional[SaRow]:
424 t = self.conn.t.placex
426 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
427 .where(t.c.rank_address == 0)\
428 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
429 .where(t.c.name != None)\
430 .where(t.c.indexed_status == 0)\
431 .where(t.c.linked_place_id == None)\
432 .where(self._filter_by_layer(t))\
433 .where(t.c.geometry.intersects(sa.func.ST_Expand(WKT_PARAM, 0.007)))\
434 .order_by(sa.desc(t.c.rank_search))\
435 .order_by('distance')\
439 sql = _select_from_placex(inner, False)\
440 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
441 inner.c.geometry.ST_Contains(WKT_PARAM)))\
442 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
445 if self.has_geometries():
446 sql = self._add_geometry_columns(sql, inner.c.geometry)
448 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
449 log().var_dump('Result (non-address feature)', row)
454 async def lookup_area(self) -> Optional[SaRow]:
455 """ Lookup large areas for the current search.
457 log().section('Reverse lookup by larger area features')
459 if self.layer_enabled(DataLayer.ADDRESS):
460 address_row = await self._lookup_area_address()
464 if self.has_feature_layers():
465 other_row = await self._lookup_area_others()
469 return _get_closest(address_row, other_row)
472 async def lookup_country_codes(self) -> List[str]:
473 """ Lookup the country for the current search.
475 log().section('Reverse lookup by country code')
476 t = self.conn.t.country_grid
477 sql = sa.select(t.c.country_code).distinct()\
478 .where(t.c.geometry.ST_Contains(WKT_PARAM))
480 ccodes = [cast(str, r[0]) for r in await self.conn.execute(sql, self.bind_params)]
481 log().var_dump('Country codes', ccodes)
485 async def lookup_country(self, ccodes: List[str]) -> Optional[SaRow]:
486 """ Lookup the country for the current search.
489 ccodes = await self.lookup_country_codes()
494 t = self.conn.t.placex
495 if self.max_rank > 4:
496 log().comment('Search for place nodes in country')
498 def _base_query() -> SaSelect:
501 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
502 .where(t.c.rank_search > 4)\
503 .where(t.c.rank_search <= MAX_RANK_PARAM)\
504 .where(t.c.indexed_status == 0)\
505 .where(t.c.country_code.in_(ccodes))\
506 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
507 .order_by(sa.desc(t.c.rank_search))\
511 return _select_from_placex(inner, False)\
512 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
513 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
516 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
517 if self.has_geometries():
518 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
520 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
521 log().var_dump('Result (addressable place node)', address_row)
525 if address_row is None:
526 # Still nothing, then return a country with the appropriate country code.
527 sql = sa.lambda_stmt(lambda: _select_from_placex(t)\
528 .where(t.c.country_code.in_(ccodes))\
529 .where(t.c.rank_address == 4)\
530 .where(t.c.rank_search == 4)\
531 .where(t.c.linked_place_id == None)\
532 .order_by('distance')\
535 if self.has_geometries():
536 sql = self._add_geometry_columns(sql, t.c.geometry)
538 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
543 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
544 """ Look up a single coordinate. Returns the place information,
545 if a place was found near the coordinates or None otherwise.
547 log().function('reverse_lookup', coord=coord, params=self.params)
550 self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
552 row: Optional[SaRow] = None
553 row_func: RowFunc = nres.create_from_placex_row
555 if self.max_rank >= 26:
556 row, tmp_row_func = await self.lookup_street_poi()
558 row_func = tmp_row_func
561 if self.restrict_to_country_areas:
562 ccodes = await self.lookup_country_codes()
568 if self.max_rank > 4:
569 row = await self.lookup_area()
570 if row is None and self.layer_enabled(DataLayer.ADDRESS):
571 row = await self.lookup_country(ccodes)
573 result = row_func(row, nres.ReverseResult)
574 if result is not None:
575 assert row is not None
576 result.distance = row.distance
577 if hasattr(row, 'bbox'):
578 result.bbox = Bbox.from_wkb(row.bbox)
579 await nres.add_result_details(self.conn, [result], self.params)