1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any, cast, Union
13 import sqlalchemy as sa
15 from .typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow,\
16 SaBind, SaLambdaSelect
17 from .sql.sqlalchemy_types import Geometry
18 from .connection import SearchConnection
19 from . import results as nres
20 from .logging import log
21 from .types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
23 # In SQLAlchemy expression which compare with NULL need to be expressed with
25 # pylint: disable=singleton-comparison
27 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
29 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
30 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
32 def no_index(expr: SaColumn) -> SaColumn:
33 """ Wrap the given expression, so that the query planner will
34 refrain from using the expression for index lookup.
36 return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
39 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
40 """ Create a select statement with the columns relevant for reverse
44 distance = t.c.distance
45 centroid = t.c.centroid
47 distance = t.c.geometry.ST_Distance(WKT_PARAM)
48 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
49 else_=t.c.centroid).label('centroid')
52 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
54 t.c.address, t.c.extratags,
55 t.c.housenumber, t.c.postcode, t.c.country_code,
56 t.c.importance, t.c.wikipedia,
57 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
59 t.c.linked_place_id, t.c.admin_level,
60 distance.label('distance'),
61 t.c.geometry.ST_Expand(0).label('bbox'))
64 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
65 return sa.cast(table.c.startnumber
66 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
67 / table.c.step) * table.c.step,
68 sa.Integer).label('housenumber')
71 def _interpolated_position(table: SaFromClause) -> SaLabel:
72 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
73 rounded_pos = sa.func.round(table.c.position / fac) * fac
75 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
76 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
79 def _locate_interpolation(table: SaFromClause) -> SaLabel:
80 """ Given a position, locate the closest point on the line.
82 return sa.case((table.c.linegeo.is_line_like(),
83 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
84 else_=0).label('position')
87 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
88 return min(rows, key=lambda row: 1000 if row is None else row.distance)
91 class ReverseGeocoder:
92 """ Class implementing the logic for looking up a place from a
96 def __init__(self, conn: SearchConnection, params: ReverseDetails,
97 restrict_to_country_areas: bool = False) -> None:
100 self.restrict_to_country_areas = restrict_to_country_areas
102 self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
106 def max_rank(self) -> int:
107 """ Return the maximum configured rank.
109 return self.params.max_rank
112 def has_geometries(self) -> bool:
113 """ Check if any geometries are requested.
115 return bool(self.params.geometry_output)
118 def layer_enabled(self, *layer: DataLayer) -> bool:
119 """ Return true when any of the given layer types are requested.
121 return any(self.params.layers & l for l in layer)
124 def layer_disabled(self, *layer: DataLayer) -> bool:
125 """ Return true when none of the given layer types is requested.
127 return not any(self.params.layers & l for l in layer)
130 def has_feature_layers(self) -> bool:
131 """ Return true if any layer other than ADDRESS or POI is requested.
133 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
136 def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
139 if self.params.geometry_simplification > 0.0:
140 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
142 if self.params.geometry_output & GeometryFormat.GEOJSON:
143 out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
144 if self.params.geometry_output & GeometryFormat.TEXT:
145 out.append(sa.func.ST_AsText(col).label('geometry_text'))
146 if self.params.geometry_output & GeometryFormat.KML:
147 out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
148 if self.params.geometry_output & GeometryFormat.SVG:
149 out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
151 return sql.add_columns(*out)
154 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
155 if self.layer_enabled(DataLayer.MANMADE):
157 if self.layer_disabled(DataLayer.RAILWAY):
158 exclude.append('railway')
159 if self.layer_disabled(DataLayer.NATURAL):
160 exclude.extend(('natural', 'water', 'waterway'))
161 return table.c.class_.not_in(tuple(exclude))
164 if self.layer_enabled(DataLayer.RAILWAY):
165 include.append('railway')
166 if self.layer_enabled(DataLayer.NATURAL):
167 include.extend(('natural', 'water', 'waterway'))
168 return table.c.class_.in_(tuple(include))
171 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
172 """ Look up the closest rank 26+ place in the database, which
173 is closer than the given distance.
175 t = self.conn.t.placex
177 # PostgreSQL must not get the distance as a parameter because
178 # there is a danger it won't be able to properly estimate index use
179 # when used with prepared statements
180 diststr = sa.text(f"{distance}")
182 sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
183 .where(t.c.geometry.within_distance(WKT_PARAM, diststr))
184 .where(t.c.indexed_status == 0)
185 .where(t.c.linked_place_id == None)
186 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
187 t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
188 .order_by('distance')
191 if self.has_geometries():
192 sql = self._add_geometry_columns(sql, t.c.geometry)
194 restrict: List[Union[SaColumn, Callable[[], SaColumn]]] = []
196 if self.layer_enabled(DataLayer.ADDRESS):
197 max_rank = min(29, self.max_rank)
198 restrict.append(lambda: no_index(t.c.rank_address).between(26, max_rank))
199 if self.max_rank == 30:
200 restrict.append(lambda: sa.func.IsAddressPoint(t))
201 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
202 restrict.append(lambda: sa.and_(no_index(t.c.rank_search) == 30,
203 t.c.class_.not_in(('place', 'building')),
204 sa.not_(t.c.geometry.is_line_like())))
205 if self.has_feature_layers():
206 restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
207 no_index(t.c.rank_address) == 0,
208 self._filter_by_layer(t)))
213 sql = sql.where(sa.or_(*restrict))
215 # If the closest object is inside an area, then check if there is a
216 # POI node nearby and return that.
218 for row in await self.conn.execute(sql, self.bind_params):
220 if row.rank_search <= 27 or row.osm_type == 'N' or row.distance > 0:
224 if row.rank_search > 27 and row.osm_type == 'N'\
225 and row.distance < 0.0001:
231 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
232 t = self.conn.t.placex
234 def _base_query() -> SaSelect:
235 return _select_from_placex(t)\
236 .where(t.c.geometry.within_distance(WKT_PARAM, 0.001))\
237 .where(t.c.parent_place_id == parent_place_id)\
238 .where(sa.func.IsAddressPoint(t))\
239 .where(t.c.indexed_status == 0)\
240 .where(t.c.linked_place_id == None)\
241 .order_by('distance')\
245 if self.has_geometries():
246 sql = self._add_geometry_columns(_base_query(), t.c.geometry)
248 sql = sa.lambda_stmt(_base_query)
250 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
253 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
254 distance: float) -> Optional[SaRow]:
255 t = self.conn.t.osmline
258 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
259 _locate_interpolation(t))\
260 .where(t.c.linegeo.within_distance(WKT_PARAM, distance))\
261 .where(t.c.startnumber != None)\
262 .order_by('distance')\
265 if parent_place_id is not None:
266 sql = sql.where(t.c.parent_place_id == parent_place_id)
268 inner = sql.subquery('ipol')
270 sql = sa.select(inner.c.place_id, inner.c.osm_id,
271 inner.c.parent_place_id, inner.c.address,
272 _interpolated_housenumber(inner),
273 _interpolated_position(inner),
274 inner.c.postcode, inner.c.country_code,
277 if self.has_geometries():
278 sub = sql.subquery('geom')
279 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
281 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
284 async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
285 t = self.conn.t.tiger
287 def _base_query() -> SaSelect:
289 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
290 _locate_interpolation(t))\
291 .where(t.c.linegeo.within_distance(WKT_PARAM, 0.001))\
292 .where(t.c.parent_place_id == parent_place_id)\
293 .order_by('distance')\
297 return sa.select(inner.c.place_id,
298 inner.c.parent_place_id,
299 _interpolated_housenumber(inner),
300 _interpolated_position(inner),
305 if self.has_geometries():
306 sub = _base_query().subquery('geom')
307 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
309 sql = sa.lambda_stmt(_base_query)
311 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
314 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
315 """ Find a street or POI/address for the given WKT point.
317 log().section('Reverse lookup on street/address level')
319 parent_place_id = None
321 row = await self._find_closest_street_or_poi(distance)
322 row_func: RowFunc = nres.create_from_placex_row
323 log().var_dump('Result (street/building)', row)
325 # If the closest result was a street, but an address was requested,
326 # check for a housenumber nearby which is part of the street.
328 if self.max_rank > 27 \
329 and self.layer_enabled(DataLayer.ADDRESS) \
330 and row.rank_address <= 27:
332 parent_place_id = row.place_id
333 log().comment('Find housenumber for street')
334 addr_row = await self._find_housenumber_for_street(parent_place_id)
335 log().var_dump('Result (street housenumber)', addr_row)
337 if addr_row is not None:
339 row_func = nres.create_from_placex_row
340 distance = addr_row.distance
341 elif row.country_code == 'us' and parent_place_id is not None:
342 log().comment('Find TIGER housenumber for street')
343 addr_row = await self._find_tiger_number_for_street(parent_place_id)
344 log().var_dump('Result (street Tiger housenumber)', addr_row)
346 if addr_row is not None:
347 row_func = cast(RowFunc,
348 functools.partial(nres.create_from_tiger_row,
349 osm_type=row.osm_type,
353 distance = row.distance
355 # Check for an interpolation that is either closer than our result
356 # or belongs to a close street found.
357 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
358 log().comment('Find interpolation for street')
359 addr_row = await self._find_interpolation_for_street(parent_place_id,
361 log().var_dump('Result (street interpolation)', addr_row)
362 if addr_row is not None:
364 row_func = nres.create_from_osmline_row
369 async def _lookup_area_address(self) -> Optional[SaRow]:
370 """ Lookup large addressable areas for the given WKT point.
372 log().comment('Reverse lookup by larger address area features')
373 t = self.conn.t.placex
375 def _base_query() -> SaSelect:
376 # The inner SQL brings results in the right order, so that
377 # later only a minimum of results needs to be checked with ST_Contains.
378 inner = sa.select(t, sa.literal(0.0).label('distance'))\
379 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
380 .where(t.c.geometry.intersects(WKT_PARAM))\
381 .where(sa.func.PlacexGeometryReverseLookuppolygon())\
382 .order_by(sa.desc(t.c.rank_search))\
386 return _select_from_placex(inner, False)\
387 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
388 .order_by(sa.desc(inner.c.rank_search))\
391 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
392 if self.has_geometries():
393 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
395 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
396 log().var_dump('Result (area)', address_row)
398 if address_row is not None and address_row.rank_search < self.max_rank:
399 log().comment('Search for better matching place nodes inside the area')
401 address_rank = address_row.rank_search
402 address_id = address_row.place_id
404 def _place_inside_area_query() -> SaSelect:
407 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
408 .where(t.c.rank_search > address_rank)\
409 .where(t.c.rank_search <= MAX_RANK_PARAM)\
410 .where(t.c.indexed_status == 0)\
411 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
412 .order_by(sa.desc(t.c.rank_search))\
416 touter = t.alias('outer')
417 return _select_from_placex(inner, False)\
418 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
419 .where(touter.c.place_id == address_id)\
420 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
421 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
424 if self.has_geometries():
425 sql = self._add_geometry_columns(_place_inside_area_query(),
426 sa.literal_column('places.geometry'))
428 sql = sa.lambda_stmt(_place_inside_area_query)
430 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
431 log().var_dump('Result (place node)', place_address_row)
433 if place_address_row is not None:
434 return place_address_row
439 async def _lookup_area_others(self) -> Optional[SaRow]:
440 t = self.conn.t.placex
442 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
443 .where(t.c.rank_address == 0)\
444 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
445 .where(t.c.name != None)\
446 .where(t.c.indexed_status == 0)\
447 .where(t.c.linked_place_id == None)\
448 .where(self._filter_by_layer(t))\
449 .where(t.c.geometry.intersects(sa.func.ST_Expand(WKT_PARAM, 0.007)))\
450 .order_by(sa.desc(t.c.rank_search))\
451 .order_by('distance')\
455 sql = _select_from_placex(inner, False)\
456 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
457 inner.c.geometry.ST_Contains(WKT_PARAM)))\
458 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
461 if self.has_geometries():
462 sql = self._add_geometry_columns(sql, inner.c.geometry)
464 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
465 log().var_dump('Result (non-address feature)', row)
470 async def lookup_area(self) -> Optional[SaRow]:
471 """ Lookup large areas for the current search.
473 log().section('Reverse lookup by larger area features')
475 if self.layer_enabled(DataLayer.ADDRESS):
476 address_row = await self._lookup_area_address()
480 if self.has_feature_layers():
481 other_row = await self._lookup_area_others()
485 return _get_closest(address_row, other_row)
488 async def lookup_country_codes(self) -> List[str]:
489 """ Lookup the country for the current search.
491 log().section('Reverse lookup by country code')
492 t = self.conn.t.country_grid
493 sql = sa.select(t.c.country_code).distinct()\
494 .where(t.c.geometry.ST_Contains(WKT_PARAM))
496 ccodes = [cast(str, r[0]) for r in await self.conn.execute(sql, self.bind_params)]
497 log().var_dump('Country codes', ccodes)
501 async def lookup_country(self, ccodes: List[str]) -> Optional[SaRow]:
502 """ Lookup the country for the current search.
505 ccodes = await self.lookup_country_codes()
510 t = self.conn.t.placex
511 if self.max_rank > 4:
512 log().comment('Search for place nodes in country')
514 def _base_query() -> SaSelect:
517 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
518 .where(t.c.rank_search > 4)\
519 .where(t.c.rank_search <= MAX_RANK_PARAM)\
520 .where(t.c.indexed_status == 0)\
521 .where(t.c.country_code.in_(ccodes))\
522 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
523 .order_by(sa.desc(t.c.rank_search))\
527 return _select_from_placex(inner, False)\
528 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
529 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
533 if self.has_geometries():
534 sql = self._add_geometry_columns(_base_query(),
535 sa.literal_column('area.geometry'))
537 sql = sa.lambda_stmt(_base_query)
539 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
540 log().var_dump('Result (addressable place node)', address_row)
544 if address_row is None:
545 # Still nothing, then return a country with the appropriate country code.
546 def _country_base_query() -> SaSelect:
547 return _select_from_placex(t)\
548 .where(t.c.country_code.in_(ccodes))\
549 .where(t.c.rank_address == 4)\
550 .where(t.c.rank_search == 4)\
551 .where(t.c.linked_place_id == None)\
552 .order_by('distance')\
555 if self.has_geometries():
556 sql = self._add_geometry_columns(_country_base_query(), t.c.geometry)
558 sql = sa.lambda_stmt(_country_base_query)
560 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
565 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
566 """ Look up a single coordinate. Returns the place information,
567 if a place was found near the coordinates or None otherwise.
569 log().function('reverse_lookup', coord=coord, params=self.params)
572 self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
574 row: Optional[SaRow] = None
575 row_func: RowFunc = nres.create_from_placex_row
577 if self.max_rank >= 26:
578 row, tmp_row_func = await self.lookup_street_poi()
580 row_func = tmp_row_func
583 if self.restrict_to_country_areas:
584 ccodes = await self.lookup_country_codes()
590 if self.max_rank > 4:
591 row = await self.lookup_area()
592 if row is None and self.layer_enabled(DataLayer.ADDRESS):
593 row = await self.lookup_country(ccodes)
595 result = row_func(row, nres.ReverseResult)
596 if result is not None:
597 assert row is not None
598 result.distance = row.distance
599 if hasattr(row, 'bbox'):
600 result.bbox = Bbox.from_wkb(row.bbox)
601 await nres.add_result_details(self.conn, [result], self.params)