1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any, cast, Union
13 import sqlalchemy as sa
15 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow,\
16 SaBind, SaLambdaSelect
17 from nominatim.api.connection import SearchConnection
18 import nominatim.api.results as nres
19 from nominatim.api.logging import log
20 from nominatim.api.types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
21 from nominatim.db.sqlalchemy_types import Geometry
22 import nominatim.db.sqlalchemy_functions as snfn
24 # In SQLAlchemy expression which compare with NULL need to be expressed with
26 # pylint: disable=singleton-comparison
28 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
30 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
31 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
33 def no_index(expr: SaColumn) -> SaColumn:
34 """ Wrap the given expression, so that the query planner will
35 refrain from using the expression for index lookup.
37 return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
40 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
41 """ Create a select statement with the columns relevant for reverse
45 distance = t.c.distance
46 centroid = t.c.centroid
48 distance = t.c.geometry.ST_Distance(WKT_PARAM)
49 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
50 else_=t.c.centroid).label('centroid')
53 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
55 t.c.address, t.c.extratags,
56 t.c.housenumber, t.c.postcode, t.c.country_code,
57 t.c.importance, t.c.wikipedia,
58 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
60 t.c.linked_place_id, t.c.admin_level,
61 distance.label('distance'),
62 t.c.geometry.ST_Expand(0).label('bbox'))
65 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
66 return sa.cast(table.c.startnumber
67 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
68 / table.c.step) * table.c.step,
69 sa.Integer).label('housenumber')
72 def _interpolated_position(table: SaFromClause) -> SaLabel:
73 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
74 rounded_pos = sa.func.round(table.c.position / fac) * fac
76 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
77 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
80 def _locate_interpolation(table: SaFromClause) -> SaLabel:
81 """ Given a position, locate the closest point on the line.
83 return sa.case((table.c.linegeo.is_line_like(),
84 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
85 else_=0).label('position')
88 def _is_address_point(table: SaFromClause) -> SaColumn:
89 return sa.and_(table.c.rank_address == 30,
90 sa.or_(table.c.housenumber != None,
91 table.c.name.has_key('addr:housename')))
94 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
95 return min(rows, key=lambda row: 1000 if row is None else row.distance)
98 class ReverseGeocoder:
99 """ Class implementing the logic for looking up a place from a
103 def __init__(self, conn: SearchConnection, params: ReverseDetails,
104 restrict_to_country_areas: bool = False) -> None:
107 self.restrict_to_country_areas = restrict_to_country_areas
109 self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
113 def max_rank(self) -> int:
114 """ Return the maximum configured rank.
116 return self.params.max_rank
119 def has_geometries(self) -> bool:
120 """ Check if any geometries are requested.
122 return bool(self.params.geometry_output)
125 def layer_enabled(self, *layer: DataLayer) -> bool:
126 """ Return true when any of the given layer types are requested.
128 return any(self.params.layers & l for l in layer)
131 def layer_disabled(self, *layer: DataLayer) -> bool:
132 """ Return true when none of the given layer types is requested.
134 return not any(self.params.layers & l for l in layer)
137 def has_feature_layers(self) -> bool:
138 """ Return true if any layer other than ADDRESS or POI is requested.
140 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
143 def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
146 if self.params.geometry_simplification > 0.0:
147 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
149 if self.params.geometry_output & GeometryFormat.GEOJSON:
150 out.append(sa.func.ST_AsGeoJSON(col).label('geometry_geojson'))
151 if self.params.geometry_output & GeometryFormat.TEXT:
152 out.append(sa.func.ST_AsText(col).label('geometry_text'))
153 if self.params.geometry_output & GeometryFormat.KML:
154 out.append(sa.func.ST_AsKML(col).label('geometry_kml'))
155 if self.params.geometry_output & GeometryFormat.SVG:
156 out.append(sa.func.ST_AsSVG(col).label('geometry_svg'))
158 return sql.add_columns(*out)
161 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
162 if self.layer_enabled(DataLayer.MANMADE):
164 if self.layer_disabled(DataLayer.RAILWAY):
165 exclude.append('railway')
166 if self.layer_disabled(DataLayer.NATURAL):
167 exclude.extend(('natural', 'water', 'waterway'))
168 return table.c.class_.not_in(tuple(exclude))
171 if self.layer_enabled(DataLayer.RAILWAY):
172 include.append('railway')
173 if self.layer_enabled(DataLayer.NATURAL):
174 include.extend(('natural', 'water', 'waterway'))
175 return table.c.class_.in_(tuple(include))
178 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
179 """ Look up the closest rank 26+ place in the database, which
180 is closer than the given distance.
182 t = self.conn.t.placex
184 # PostgreSQL must not get the distance as a parameter because
185 # there is a danger it won't be able to proberly estimate index use
186 # when used with prepared statements
187 diststr = sa.text(f"{distance}")
189 sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
190 .where(t.c.geometry.ST_DWithin(WKT_PARAM, diststr))
191 .where(t.c.indexed_status == 0)
192 .where(t.c.linked_place_id == None)
193 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
194 t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
195 .order_by('distance')
198 if self.has_geometries():
199 sql = self._add_geometry_columns(sql, t.c.geometry)
201 restrict: List[Union[SaColumn, Callable[[], SaColumn]]] = []
203 if self.layer_enabled(DataLayer.ADDRESS):
204 max_rank = min(29, self.max_rank)
205 restrict.append(lambda: no_index(t.c.rank_address).between(26, max_rank))
206 if self.max_rank == 30:
207 restrict.append(lambda: _is_address_point(t))
208 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
209 restrict.append(lambda: sa.and_(no_index(t.c.rank_search) == 30,
210 t.c.class_.not_in(('place', 'building')),
211 sa.not_(t.c.geometry.is_line_like())))
212 if self.has_feature_layers():
213 restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
214 no_index(t.c.rank_address) == 0,
215 self._filter_by_layer(t)))
220 sql = sql.where(sa.or_(*restrict))
222 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
225 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
226 t = self.conn.t.placex
228 sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
229 .where(t.c.geometry.ST_DWithin(WKT_PARAM, 0.001))
230 .where(t.c.parent_place_id == parent_place_id)
231 .where(_is_address_point(t))
232 .where(t.c.indexed_status == 0)
233 .where(t.c.linked_place_id == None)
234 .order_by('distance')
237 if self.has_geometries():
238 sql = self._add_geometry_columns(sql, t.c.geometry)
240 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
243 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
244 distance: float) -> Optional[SaRow]:
245 t = self.conn.t.osmline
247 sql: Any = sa.lambda_stmt(lambda:
249 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
250 _locate_interpolation(t))
251 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, distance))
252 .where(t.c.startnumber != None)
253 .order_by('distance')
256 if parent_place_id is not None:
257 sql += lambda s: s.where(t.c.parent_place_id == parent_place_id)
259 def _wrap_query(base_sql: SaLambdaSelect) -> SaSelect:
260 inner = base_sql.subquery('ipol')
262 return sa.select(inner.c.place_id, inner.c.osm_id,
263 inner.c.parent_place_id, inner.c.address,
264 _interpolated_housenumber(inner),
265 _interpolated_position(inner),
266 inner.c.postcode, inner.c.country_code,
271 if self.has_geometries():
272 sub = sql.subquery('geom')
273 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
275 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
278 async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
279 t = self.conn.t.tiger
281 def _base_query() -> SaSelect:
283 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
284 _locate_interpolation(t))\
285 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, 0.001))\
286 .where(t.c.parent_place_id == parent_place_id)\
287 .order_by('distance')\
291 return sa.select(inner.c.place_id,
292 inner.c.parent_place_id,
293 _interpolated_housenumber(inner),
294 _interpolated_position(inner),
298 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
300 if self.has_geometries():
301 sub = sql.subquery('geom')
302 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
304 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
307 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
308 """ Find a street or POI/address for the given WKT point.
310 log().section('Reverse lookup on street/address level')
312 parent_place_id = None
314 row = await self._find_closest_street_or_poi(distance)
315 row_func: RowFunc = nres.create_from_placex_row
316 log().var_dump('Result (street/building)', row)
318 # If the closest result was a street, but an address was requested,
319 # check for a housenumber nearby which is part of the street.
321 if self.max_rank > 27 \
322 and self.layer_enabled(DataLayer.ADDRESS) \
323 and row.rank_address <= 27:
325 parent_place_id = row.place_id
326 log().comment('Find housenumber for street')
327 addr_row = await self._find_housenumber_for_street(parent_place_id)
328 log().var_dump('Result (street housenumber)', addr_row)
330 if addr_row is not None:
332 row_func = nres.create_from_placex_row
333 distance = addr_row.distance
334 elif row.country_code == 'us' and parent_place_id is not None:
335 log().comment('Find TIGER housenumber for street')
336 addr_row = await self._find_tiger_number_for_street(parent_place_id)
337 log().var_dump('Result (street Tiger housenumber)', addr_row)
339 if addr_row is not None:
340 row_func = cast(RowFunc,
341 functools.partial(nres.create_from_tiger_row,
342 osm_type=row.osm_type,
346 distance = row.distance
348 # Check for an interpolation that is either closer than our result
349 # or belongs to a close street found.
350 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
351 log().comment('Find interpolation for street')
352 addr_row = await self._find_interpolation_for_street(parent_place_id,
354 log().var_dump('Result (street interpolation)', addr_row)
355 if addr_row is not None:
357 row_func = nres.create_from_osmline_row
362 async def _lookup_area_address(self) -> Optional[SaRow]:
363 """ Lookup large addressable areas for the given WKT point.
365 log().comment('Reverse lookup by larger address area features')
366 t = self.conn.t.placex
368 def _base_query() -> SaSelect:
369 # The inner SQL brings results in the right order, so that
370 # later only a minimum of results needs to be checked with ST_Contains.
371 inner = sa.select(t, sa.literal(0.0).label('distance'))\
372 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
373 .where(t.c.geometry.intersects(WKT_PARAM))\
374 .where(snfn.select_index_placex_geometry_reverse_lookuppolygon('placex'))\
375 .order_by(sa.desc(t.c.rank_search))\
379 return _select_from_placex(inner, False)\
380 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
381 .order_by(sa.desc(inner.c.rank_search))\
384 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
385 if self.has_geometries():
386 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
388 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
389 log().var_dump('Result (area)', address_row)
391 if address_row is not None and address_row.rank_search < self.max_rank:
392 log().comment('Search for better matching place nodes inside the area')
394 address_rank = address_row.rank_search
395 address_id = address_row.place_id
397 def _place_inside_area_query() -> SaSelect:
400 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
401 .where(t.c.rank_search > address_rank)\
402 .where(t.c.rank_search <= MAX_RANK_PARAM)\
403 .where(t.c.indexed_status == 0)\
404 .where(snfn.select_index_placex_geometry_reverse_lookupplacenode('placex'))\
406 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
407 .intersects(WKT_PARAM))\
408 .order_by(sa.desc(t.c.rank_search))\
412 touter = t.alias('outer')
413 return _select_from_placex(inner, False)\
414 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
415 .where(touter.c.place_id == address_id)\
416 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
417 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
420 sql = sa.lambda_stmt(_place_inside_area_query)
421 if self.has_geometries():
422 sql = self._add_geometry_columns(sql, sa.literal_column('places.geometry'))
424 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
425 log().var_dump('Result (place node)', place_address_row)
427 if place_address_row is not None:
428 return place_address_row
433 async def _lookup_area_others(self) -> Optional[SaRow]:
434 t = self.conn.t.placex
436 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
437 .where(t.c.rank_address == 0)\
438 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
439 .where(t.c.name != None)\
440 .where(t.c.indexed_status == 0)\
441 .where(t.c.linked_place_id == None)\
442 .where(self._filter_by_layer(t))\
444 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
445 .intersects(WKT_PARAM))\
446 .order_by(sa.desc(t.c.rank_search))\
450 sql = _select_from_placex(inner, False)\
451 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
452 inner.c.geometry.ST_Contains(WKT_PARAM)))\
453 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
456 if self.has_geometries():
457 sql = self._add_geometry_columns(sql, inner.c.geometry)
459 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
460 log().var_dump('Result (non-address feature)', row)
465 async def lookup_area(self) -> Optional[SaRow]:
466 """ Lookup large areas for the current search.
468 log().section('Reverse lookup by larger area features')
470 if self.layer_enabled(DataLayer.ADDRESS):
471 address_row = await self._lookup_area_address()
475 if self.has_feature_layers():
476 other_row = await self._lookup_area_others()
480 return _get_closest(address_row, other_row)
483 async def lookup_country_codes(self) -> List[str]:
484 """ Lookup the country for the current search.
486 log().section('Reverse lookup by country code')
487 t = self.conn.t.country_grid
488 sql = sa.select(t.c.country_code).distinct()\
489 .where(t.c.geometry.ST_Contains(WKT_PARAM))
491 ccodes = [cast(str, r[0]) for r in await self.conn.execute(sql, self.bind_params)]
492 log().var_dump('Country codes', ccodes)
496 async def lookup_country(self, ccodes: List[str]) -> Optional[SaRow]:
497 """ Lookup the country for the current search.
500 ccodes = await self.lookup_country_codes()
505 t = self.conn.t.placex
506 if self.max_rank > 4:
507 log().comment('Search for place nodes in country')
509 def _base_query() -> SaSelect:
512 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
513 .where(t.c.rank_search > 4)\
514 .where(t.c.rank_search <= MAX_RANK_PARAM)\
515 .where(t.c.indexed_status == 0)\
516 .where(t.c.country_code.in_(ccodes))\
517 .where(snfn.select_index_placex_geometry_reverse_lookupplacenode('placex'))\
519 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
520 .intersects(WKT_PARAM))\
521 .order_by(sa.desc(t.c.rank_search))\
525 return _select_from_placex(inner, False)\
526 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
527 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
530 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
531 if self.has_geometries():
532 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
534 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
535 log().var_dump('Result (addressable place node)', address_row)
539 if address_row is None:
540 # Still nothing, then return a country with the appropriate country code.
541 sql = sa.lambda_stmt(lambda: _select_from_placex(t)\
542 .where(t.c.country_code.in_(ccodes))\
543 .where(t.c.rank_address == 4)\
544 .where(t.c.rank_search == 4)\
545 .where(t.c.linked_place_id == None)\
546 .order_by('distance')\
549 if self.has_geometries():
550 sql = self._add_geometry_columns(sql, t.c.geometry)
552 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
557 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
558 """ Look up a single coordinate. Returns the place information,
559 if a place was found near the coordinates or None otherwise.
561 log().function('reverse_lookup', coord=coord, params=self.params)
564 self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
566 row: Optional[SaRow] = None
567 row_func: RowFunc = nres.create_from_placex_row
569 if self.max_rank >= 26:
570 row, tmp_row_func = await self.lookup_street_poi()
572 row_func = tmp_row_func
575 if self.restrict_to_country_areas:
576 ccodes = await self.lookup_country_codes()
582 if self.max_rank > 4:
583 row = await self.lookup_area()
584 if row is None and self.layer_enabled(DataLayer.ADDRESS):
585 row = await self.lookup_country(ccodes)
587 result = row_func(row, nres.ReverseResult)
588 if result is not None:
589 assert row is not None
590 result.distance = row.distance
591 if hasattr(row, 'bbox'):
592 result.bbox = Bbox.from_wkb(row.bbox)
593 await nres.add_result_details(self.conn, [result], self.params)