1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any, cast, Union
13 import sqlalchemy as sa
15 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow,\
16 SaBind, SaLambdaSelect
17 from nominatim.api.connection import SearchConnection
18 import nominatim.api.results as nres
19 from nominatim.api.logging import log
20 from nominatim.api.types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
21 from nominatim.db.sqlalchemy_types import Geometry
22 import nominatim.db.sqlalchemy_functions as snfn
24 # In SQLAlchemy expression which compare with NULL need to be expressed with
26 # pylint: disable=singleton-comparison
28 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
30 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
31 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
33 def no_index(expr: SaColumn) -> SaColumn:
34 """ Wrap the given expression, so that the query planner will
35 refrain from using the expression for index lookup.
37 return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
40 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
41 """ Create a select statement with the columns relevant for reverse
45 distance = t.c.distance
46 centroid = t.c.centroid
48 distance = t.c.geometry.ST_Distance(WKT_PARAM)
49 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
50 else_=t.c.centroid).label('centroid')
53 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
55 t.c.address, t.c.extratags,
56 t.c.housenumber, t.c.postcode, t.c.country_code,
57 t.c.importance, t.c.wikipedia,
58 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
60 distance.label('distance'),
61 t.c.geometry.ST_Expand(0).label('bbox'))
64 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
65 return sa.cast(table.c.startnumber
66 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
67 / table.c.step) * table.c.step,
68 sa.Integer).label('housenumber')
71 def _interpolated_position(table: SaFromClause) -> SaLabel:
72 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
73 rounded_pos = sa.func.round(table.c.position / fac) * fac
75 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
76 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
79 def _locate_interpolation(table: SaFromClause) -> SaLabel:
80 """ Given a position, locate the closest point on the line.
82 return sa.case((table.c.linegeo.is_line_like(),
83 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
84 else_=0).label('position')
87 def _is_address_point(table: SaFromClause) -> SaColumn:
88 return sa.and_(table.c.rank_address == 30,
89 sa.or_(table.c.housenumber != None,
90 table.c.name.has_key('addr:housename')))
93 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
94 return min(rows, key=lambda row: 1000 if row is None else row.distance)
97 class ReverseGeocoder:
98 """ Class implementing the logic for looking up a place from a
102 def __init__(self, conn: SearchConnection, params: ReverseDetails,
103 restrict_to_country_areas: bool = False) -> None:
106 self.restrict_to_country_areas = restrict_to_country_areas
108 self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
112 def max_rank(self) -> int:
113 """ Return the maximum configured rank.
115 return self.params.max_rank
118 def has_geometries(self) -> bool:
119 """ Check if any geometries are requested.
121 return bool(self.params.geometry_output)
124 def layer_enabled(self, *layer: DataLayer) -> bool:
125 """ Return true when any of the given layer types are requested.
127 return any(self.params.layers & l for l in layer)
130 def layer_disabled(self, *layer: DataLayer) -> bool:
131 """ Return true when none of the given layer types is requested.
133 return not any(self.params.layers & l for l in layer)
136 def has_feature_layers(self) -> bool:
137 """ Return true if any layer other than ADDRESS or POI is requested.
139 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
142 def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
145 if self.params.geometry_simplification > 0.0:
146 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
148 if self.params.geometry_output & GeometryFormat.GEOJSON:
149 out.append(sa.func.ST_AsGeoJSON(col).label('geometry_geojson'))
150 if self.params.geometry_output & GeometryFormat.TEXT:
151 out.append(sa.func.ST_AsText(col).label('geometry_text'))
152 if self.params.geometry_output & GeometryFormat.KML:
153 out.append(sa.func.ST_AsKML(col).label('geometry_kml'))
154 if self.params.geometry_output & GeometryFormat.SVG:
155 out.append(sa.func.ST_AsSVG(col).label('geometry_svg'))
157 return sql.add_columns(*out)
160 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
161 if self.layer_enabled(DataLayer.MANMADE):
163 if self.layer_disabled(DataLayer.RAILWAY):
164 exclude.append('railway')
165 if self.layer_disabled(DataLayer.NATURAL):
166 exclude.extend(('natural', 'water', 'waterway'))
167 return table.c.class_.not_in(tuple(exclude))
170 if self.layer_enabled(DataLayer.RAILWAY):
171 include.append('railway')
172 if self.layer_enabled(DataLayer.NATURAL):
173 include.extend(('natural', 'water', 'waterway'))
174 return table.c.class_.in_(tuple(include))
177 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
178 """ Look up the closest rank 26+ place in the database, which
179 is closer than the given distance.
181 t = self.conn.t.placex
183 # PostgreSQL must not get the distance as a parameter because
184 # there is a danger it won't be able to proberly estimate index use
185 # when used with prepared statements
186 diststr = sa.text(f"{distance}")
188 sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
189 .where(t.c.geometry.ST_DWithin(WKT_PARAM, diststr))
190 .where(t.c.indexed_status == 0)
191 .where(t.c.linked_place_id == None)
192 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
193 t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
194 .order_by('distance')
197 if self.has_geometries():
198 sql = self._add_geometry_columns(sql, t.c.geometry)
200 restrict: List[Union[SaColumn, Callable[[], SaColumn]]] = []
202 if self.layer_enabled(DataLayer.ADDRESS):
203 max_rank = min(29, self.max_rank)
204 restrict.append(lambda: no_index(t.c.rank_address).between(26, max_rank))
205 if self.max_rank == 30:
206 restrict.append(lambda: _is_address_point(t))
207 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
208 restrict.append(lambda: sa.and_(no_index(t.c.rank_search) == 30,
209 t.c.class_.not_in(('place', 'building')),
210 sa.not_(t.c.geometry.is_line_like())))
211 if self.has_feature_layers():
212 restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
213 no_index(t.c.rank_address) == 0,
214 self._filter_by_layer(t)))
219 sql = sql.where(sa.or_(*restrict))
221 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
224 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
225 t = self.conn.t.placex
227 sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
228 .where(t.c.geometry.ST_DWithin(WKT_PARAM, 0.001))
229 .where(t.c.parent_place_id == parent_place_id)
230 .where(_is_address_point(t))
231 .where(t.c.indexed_status == 0)
232 .where(t.c.linked_place_id == None)
233 .order_by('distance')
236 if self.has_geometries():
237 sql = self._add_geometry_columns(sql, t.c.geometry)
239 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
242 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
243 distance: float) -> Optional[SaRow]:
244 t = self.conn.t.osmline
246 sql: Any = sa.lambda_stmt(lambda:
248 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
249 _locate_interpolation(t))
250 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, distance))
251 .where(t.c.startnumber != None)
252 .order_by('distance')
255 if parent_place_id is not None:
256 sql += lambda s: s.where(t.c.parent_place_id == parent_place_id)
258 def _wrap_query(base_sql: SaLambdaSelect) -> SaSelect:
259 inner = base_sql.subquery('ipol')
261 return sa.select(inner.c.place_id, inner.c.osm_id,
262 inner.c.parent_place_id, inner.c.address,
263 _interpolated_housenumber(inner),
264 _interpolated_position(inner),
265 inner.c.postcode, inner.c.country_code,
270 if self.has_geometries():
271 sub = sql.subquery('geom')
272 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
274 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
277 async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
278 t = self.conn.t.tiger
280 def _base_query() -> SaSelect:
282 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
283 _locate_interpolation(t))\
284 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, 0.001))\
285 .where(t.c.parent_place_id == parent_place_id)\
286 .order_by('distance')\
290 return sa.select(inner.c.place_id,
291 inner.c.parent_place_id,
292 _interpolated_housenumber(inner),
293 _interpolated_position(inner),
297 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
299 if self.has_geometries():
300 sub = sql.subquery('geom')
301 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
303 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
306 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
307 """ Find a street or POI/address for the given WKT point.
309 log().section('Reverse lookup on street/address level')
311 parent_place_id = None
313 row = await self._find_closest_street_or_poi(distance)
314 row_func: RowFunc = nres.create_from_placex_row
315 log().var_dump('Result (street/building)', row)
317 # If the closest result was a street, but an address was requested,
318 # check for a housenumber nearby which is part of the street.
320 if self.max_rank > 27 \
321 and self.layer_enabled(DataLayer.ADDRESS) \
322 and row.rank_address <= 27:
324 parent_place_id = row.place_id
325 log().comment('Find housenumber for street')
326 addr_row = await self._find_housenumber_for_street(parent_place_id)
327 log().var_dump('Result (street housenumber)', addr_row)
329 if addr_row is not None:
331 row_func = nres.create_from_placex_row
332 distance = addr_row.distance
333 elif row.country_code == 'us' and parent_place_id is not None:
334 log().comment('Find TIGER housenumber for street')
335 addr_row = await self._find_tiger_number_for_street(parent_place_id)
336 log().var_dump('Result (street Tiger housenumber)', addr_row)
338 if addr_row is not None:
339 row_func = cast(RowFunc,
340 functools.partial(nres.create_from_tiger_row,
341 osm_type=row.osm_type,
345 distance = row.distance
347 # Check for an interpolation that is either closer than our result
348 # or belongs to a close street found.
349 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
350 log().comment('Find interpolation for street')
351 addr_row = await self._find_interpolation_for_street(parent_place_id,
353 log().var_dump('Result (street interpolation)', addr_row)
354 if addr_row is not None:
356 row_func = nres.create_from_osmline_row
361 async def _lookup_area_address(self) -> Optional[SaRow]:
362 """ Lookup large addressable areas for the given WKT point.
364 log().comment('Reverse lookup by larger address area features')
365 t = self.conn.t.placex
367 def _base_query() -> SaSelect:
368 # The inner SQL brings results in the right order, so that
369 # later only a minimum of results needs to be checked with ST_Contains.
370 inner = sa.select(t, sa.literal(0.0).label('distance'))\
371 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
372 .where(t.c.geometry.intersects(WKT_PARAM))\
373 .where(snfn.select_index_placex_geometry_reverse_lookuppolygon('placex'))\
374 .order_by(sa.desc(t.c.rank_search))\
378 return _select_from_placex(inner, False)\
379 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
380 .order_by(sa.desc(inner.c.rank_search))\
383 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
384 if self.has_geometries():
385 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
387 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
388 log().var_dump('Result (area)', address_row)
390 if address_row is not None and address_row.rank_search < self.max_rank:
391 log().comment('Search for better matching place nodes inside the area')
393 address_rank = address_row.rank_search
394 address_id = address_row.place_id
396 def _place_inside_area_query() -> SaSelect:
399 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
400 .where(t.c.rank_search > address_rank)\
401 .where(t.c.rank_search <= MAX_RANK_PARAM)\
402 .where(t.c.indexed_status == 0)\
403 .where(snfn.select_index_placex_geometry_reverse_lookupplacenode('placex'))\
405 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
406 .intersects(WKT_PARAM))\
407 .order_by(sa.desc(t.c.rank_search))\
411 touter = t.alias('outer')
412 return _select_from_placex(inner, False)\
413 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
414 .where(touter.c.place_id == address_id)\
415 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
416 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
419 sql = sa.lambda_stmt(_place_inside_area_query)
420 if self.has_geometries():
421 sql = self._add_geometry_columns(sql, sa.literal_column('places.geometry'))
423 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
424 log().var_dump('Result (place node)', place_address_row)
426 if place_address_row is not None:
427 return place_address_row
432 async def _lookup_area_others(self) -> Optional[SaRow]:
433 t = self.conn.t.placex
435 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
436 .where(t.c.rank_address == 0)\
437 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
438 .where(t.c.name != None)\
439 .where(t.c.indexed_status == 0)\
440 .where(t.c.linked_place_id == None)\
441 .where(self._filter_by_layer(t))\
443 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
444 .intersects(WKT_PARAM))\
445 .order_by(sa.desc(t.c.rank_search))\
449 sql = _select_from_placex(inner, False)\
450 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
451 inner.c.geometry.ST_Contains(WKT_PARAM)))\
452 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
455 if self.has_geometries():
456 sql = self._add_geometry_columns(sql, inner.c.geometry)
458 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
459 log().var_dump('Result (non-address feature)', row)
464 async def lookup_area(self) -> Optional[SaRow]:
465 """ Lookup large areas for the current search.
467 log().section('Reverse lookup by larger area features')
469 if self.layer_enabled(DataLayer.ADDRESS):
470 address_row = await self._lookup_area_address()
474 if self.has_feature_layers():
475 other_row = await self._lookup_area_others()
479 return _get_closest(address_row, other_row)
482 async def lookup_country_codes(self) -> List[str]:
483 """ Lookup the country for the current search.
485 log().section('Reverse lookup by country code')
486 t = self.conn.t.country_grid
487 sql = sa.select(t.c.country_code).distinct()\
488 .where(t.c.geometry.ST_Contains(WKT_PARAM))
490 ccodes = [cast(str, r[0]) for r in await self.conn.execute(sql, self.bind_params)]
491 log().var_dump('Country codes', ccodes)
495 async def lookup_country(self, ccodes: List[str]) -> Optional[SaRow]:
496 """ Lookup the country for the current search.
499 ccodes = await self.lookup_country_codes()
504 t = self.conn.t.placex
505 if self.max_rank > 4:
506 log().comment('Search for place nodes in country')
508 def _base_query() -> SaSelect:
511 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
512 .where(t.c.rank_search > 4)\
513 .where(t.c.rank_search <= MAX_RANK_PARAM)\
514 .where(t.c.indexed_status == 0)\
515 .where(t.c.country_code.in_(ccodes))\
516 .where(snfn.select_index_placex_geometry_reverse_lookupplacenode('placex'))\
518 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
519 .intersects(WKT_PARAM))\
520 .order_by(sa.desc(t.c.rank_search))\
524 return _select_from_placex(inner, False)\
525 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
526 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
529 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
530 if self.has_geometries():
531 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
533 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
534 log().var_dump('Result (addressable place node)', address_row)
538 if address_row is None:
539 # Still nothing, then return a country with the appropriate country code.
540 sql = sa.lambda_stmt(lambda: _select_from_placex(t)\
541 .where(t.c.country_code.in_(ccodes))\
542 .where(t.c.rank_address == 4)\
543 .where(t.c.rank_search == 4)\
544 .where(t.c.linked_place_id == None)\
545 .order_by('distance')\
548 if self.has_geometries():
549 sql = self._add_geometry_columns(sql, t.c.geometry)
551 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
556 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
557 """ Look up a single coordinate. Returns the place information,
558 if a place was found near the coordinates or None otherwise.
560 log().function('reverse_lookup', coord=coord, params=self.params)
563 self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
565 row: Optional[SaRow] = None
566 row_func: RowFunc = nres.create_from_placex_row
568 if self.max_rank >= 26:
569 row, tmp_row_func = await self.lookup_street_poi()
571 row_func = tmp_row_func
574 if self.restrict_to_country_areas:
575 ccodes = await self.lookup_country_codes()
581 if self.max_rank > 4:
582 row = await self.lookup_area()
583 if row is None and self.layer_enabled(DataLayer.ADDRESS):
584 row = await self.lookup_country(ccodes)
586 result = row_func(row, nres.ReverseResult)
587 if result is not None:
588 assert row is not None
589 result.distance = row.distance
590 if hasattr(row, 'bbox'):
591 result.bbox = Bbox.from_wkb(row.bbox)
592 await nres.add_result_details(self.conn, [result], self.params)