1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple
12 import sqlalchemy as sa
14 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow
15 from nominatim.api.connection import SearchConnection
16 import nominatim.api.results as nres
17 from nominatim.api.logging import log
18 from nominatim.api.types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
19 from nominatim.db.sqlalchemy_types import Geometry
21 # In SQLAlchemy expression which compare with NULL need to be expressed with
23 # pylint: disable=singleton-comparison
25 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
27 WKT_PARAM = sa.bindparam('wkt', type_=Geometry)
28 MAX_RANK_PARAM = sa.bindparam('max_rank')
30 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
31 """ Create a select statement with the columns relevant for reverse
35 distance = t.c.distance
36 centroid = t.c.centroid
38 distance = t.c.geometry.ST_Distance(WKT_PARAM)
39 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
40 else_=t.c.centroid).label('centroid')
43 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
45 t.c.address, t.c.extratags,
46 t.c.housenumber, t.c.postcode, t.c.country_code,
47 t.c.importance, t.c.wikipedia,
48 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
50 distance.label('distance'),
51 t.c.geometry.ST_Expand(0).label('bbox'))
54 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
55 return sa.cast(table.c.startnumber
56 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
57 / table.c.step) * table.c.step,
58 sa.Integer).label('housenumber')
61 def _interpolated_position(table: SaFromClause) -> SaLabel:
62 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
63 rounded_pos = sa.func.round(table.c.position / fac) * fac
65 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
66 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
69 def _locate_interpolation(table: SaFromClause) -> SaLabel:
70 """ Given a position, locate the closest point on the line.
72 return sa.case((table.c.linegeo.is_line_like(),
73 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
74 else_=0).label('position')
77 def _is_address_point(table: SaFromClause) -> SaColumn:
78 return sa.and_(table.c.rank_address == 30,
79 sa.or_(table.c.housenumber != None,
80 table.c.name.has_key('housename')))
83 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
84 return min(rows, key=lambda row: 1000 if row is None else row.distance)
87 class ReverseGeocoder:
88 """ Class implementing the logic for looking up a place from a
92 def __init__(self, conn: SearchConnection, params: ReverseDetails) -> None:
96 self.bind_params = {'max_rank': params.max_rank}
100 def max_rank(self) -> int:
101 """ Return the maximum configured rank.
103 return self.params.max_rank
106 def has_geometries(self) -> bool:
107 """ Check if any geometries are requested.
109 return bool(self.params.geometry_output)
112 def layer_enabled(self, *layer: DataLayer) -> bool:
113 """ Return true when any of the given layer types are requested.
115 return any(self.params.layers & l for l in layer)
118 def layer_disabled(self, *layer: DataLayer) -> bool:
119 """ Return true when none of the given layer types is requested.
121 return not any(self.params.layers & l for l in layer)
124 def has_feature_layers(self) -> bool:
125 """ Return true if any layer other than ADDRESS or POI is requested.
127 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
130 def _add_geometry_columns(self, sql: SaSelect, col: SaColumn) -> SaSelect:
131 if not self.has_geometries():
136 if self.params.geometry_simplification > 0.0:
137 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
139 if self.params.geometry_output & GeometryFormat.GEOJSON:
140 out.append(sa.func.ST_AsGeoJSON(col).label('geometry_geojson'))
141 if self.params.geometry_output & GeometryFormat.TEXT:
142 out.append(sa.func.ST_AsText(col).label('geometry_text'))
143 if self.params.geometry_output & GeometryFormat.KML:
144 out.append(sa.func.ST_AsKML(col).label('geometry_kml'))
145 if self.params.geometry_output & GeometryFormat.SVG:
146 out.append(sa.func.ST_AsSVG(col).label('geometry_svg'))
148 return sql.add_columns(*out)
151 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
152 if self.layer_enabled(DataLayer.MANMADE):
154 if self.layer_disabled(DataLayer.RAILWAY):
155 exclude.append('railway')
156 if self.layer_disabled(DataLayer.NATURAL):
157 exclude.extend(('natural', 'water', 'waterway'))
158 return table.c.class_.not_in(tuple(exclude))
161 if self.layer_enabled(DataLayer.RAILWAY):
162 include.append('railway')
163 if self.layer_enabled(DataLayer.NATURAL):
164 include.extend(('natural', 'water', 'waterway'))
165 return table.c.class_.in_(tuple(include))
168 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
169 """ Look up the closest rank 26+ place in the database, which
170 is closer than the given distance.
172 t = self.conn.t.placex
174 sql = _select_from_placex(t)\
175 .where(t.c.geometry.ST_DWithin(WKT_PARAM, distance))\
176 .where(t.c.indexed_status == 0)\
177 .where(t.c.linked_place_id == None)\
178 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
179 t.c.centroid.ST_Distance(WKT_PARAM) < distance))\
180 .order_by('distance')\
183 sql = self._add_geometry_columns(sql, t.c.geometry)
185 restrict: List[SaColumn] = []
187 if self.layer_enabled(DataLayer.ADDRESS):
188 restrict.append(sa.and_(t.c.rank_address >= 26,
189 t.c.rank_address <= min(29, self.max_rank)))
190 if self.max_rank == 30:
191 restrict.append(_is_address_point(t))
192 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
193 restrict.append(sa.and_(t.c.rank_search == 30,
194 t.c.class_.not_in(('place', 'building')),
195 sa.not_(t.c.geometry.is_line_like())))
196 if self.has_feature_layers():
197 restrict.append(sa.and_(t.c.rank_search.between(26, MAX_RANK_PARAM),
198 t.c.rank_address == 0,
199 self._filter_by_layer(t)))
204 sql = sql.where(sa.or_(*restrict))
206 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
209 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
210 t = self.conn.t.placex
212 sql = _select_from_placex(t)\
213 .where(t.c.geometry.ST_DWithin(WKT_PARAM, 0.001))\
214 .where(t.c.parent_place_id == parent_place_id)\
215 .where(_is_address_point(t))\
216 .where(t.c.indexed_status == 0)\
217 .where(t.c.linked_place_id == None)\
218 .order_by('distance')\
221 sql = self._add_geometry_columns(sql, t.c.geometry)
223 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
226 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
227 distance: float) -> Optional[SaRow]:
228 t = self.conn.t.osmline
231 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
232 _locate_interpolation(t))\
233 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, distance))\
234 .where(t.c.startnumber != None)\
235 .order_by('distance')\
238 if parent_place_id is not None:
239 sql = sql.where(t.c.parent_place_id == parent_place_id)
241 inner = sql.subquery('ipol')
243 sql = sa.select(inner.c.place_id, inner.c.osm_id,
244 inner.c.parent_place_id, inner.c.address,
245 _interpolated_housenumber(inner),
246 _interpolated_position(inner),
247 inner.c.postcode, inner.c.country_code,
250 if self.has_geometries():
251 sub = sql.subquery('geom')
252 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
254 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
257 async def _find_tiger_number_for_street(self, parent_place_id: int,
259 parent_id: int) -> Optional[SaRow]:
260 t = self.conn.t.tiger
263 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
264 _locate_interpolation(t))\
265 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, 0.001))\
266 .where(t.c.parent_place_id == parent_place_id)\
267 .order_by('distance')\
271 sql = sa.select(inner.c.place_id,
272 inner.c.parent_place_id,
273 sa.literal(parent_type).label('osm_type'),
274 sa.literal(parent_id).label('osm_id'),
275 _interpolated_housenumber(inner),
276 _interpolated_position(inner),
280 if self.has_geometries():
281 sub = sql.subquery('geom')
282 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
284 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
287 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
288 """ Find a street or POI/address for the given WKT point.
290 log().section('Reverse lookup on street/address level')
292 parent_place_id = None
294 row = await self._find_closest_street_or_poi(distance)
295 row_func: RowFunc = nres.create_from_placex_row
296 log().var_dump('Result (street/building)', row)
298 # If the closest result was a street, but an address was requested,
299 # check for a housenumber nearby which is part of the street.
301 if self.max_rank > 27 \
302 and self.layer_enabled(DataLayer.ADDRESS) \
303 and row.rank_address <= 27:
305 parent_place_id = row.place_id
306 log().comment('Find housenumber for street')
307 addr_row = await self._find_housenumber_for_street(parent_place_id)
308 log().var_dump('Result (street housenumber)', addr_row)
310 if addr_row is not None:
312 row_func = nres.create_from_placex_row
313 distance = addr_row.distance
314 elif row.country_code == 'us' and parent_place_id is not None:
315 log().comment('Find TIGER housenumber for street')
316 addr_row = await self._find_tiger_number_for_street(parent_place_id,
319 log().var_dump('Result (street Tiger housenumber)', addr_row)
321 if addr_row is not None:
323 row_func = nres.create_from_tiger_row
325 distance = row.distance
327 # Check for an interpolation that is either closer than our result
328 # or belongs to a close street found.
329 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
330 log().comment('Find interpolation for street')
331 addr_row = await self._find_interpolation_for_street(parent_place_id,
333 log().var_dump('Result (street interpolation)', addr_row)
334 if addr_row is not None:
336 row_func = nres.create_from_osmline_row
341 async def _lookup_area_address(self) -> Optional[SaRow]:
342 """ Lookup large addressable areas for the given WKT point.
344 log().comment('Reverse lookup by larger address area features')
345 t = self.conn.t.placex
347 # The inner SQL brings results in the right order, so that
348 # later only a minimum of results needs to be checked with ST_Contains.
349 inner = sa.select(t, sa.literal(0.0).label('distance'))\
350 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
351 .where(t.c.rank_address.between(5, 25))\
352 .where(t.c.geometry.is_area())\
353 .where(t.c.geometry.intersects(WKT_PARAM))\
354 .where(t.c.name != None)\
355 .where(t.c.indexed_status == 0)\
356 .where(t.c.linked_place_id == None)\
357 .where(t.c.type != 'postcode')\
358 .order_by(sa.desc(t.c.rank_search))\
362 sql = _select_from_placex(inner, False)\
363 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
364 .order_by(sa.desc(inner.c.rank_search))\
367 sql = self._add_geometry_columns(sql, inner.c.geometry)
369 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
370 log().var_dump('Result (area)', address_row)
372 if address_row is not None and address_row.rank_search < self.max_rank:
373 log().comment('Search for better matching place nodes inside the area')
375 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
376 .where(t.c.osm_type == 'N')\
377 .where(t.c.rank_search > address_row.rank_search)\
378 .where(t.c.rank_search <= MAX_RANK_PARAM)\
379 .where(t.c.rank_address.between(5, 25))\
380 .where(t.c.name != None)\
381 .where(t.c.indexed_status == 0)\
382 .where(t.c.linked_place_id == None)\
383 .where(t.c.type != 'postcode')\
385 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
386 .intersects(WKT_PARAM))\
387 .order_by(sa.desc(t.c.rank_search))\
391 touter = self.conn.t.placex.alias('outer')
392 sql = _select_from_placex(inner, False)\
393 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
394 .where(touter.c.place_id == address_row.place_id)\
395 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
396 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
399 sql = self._add_geometry_columns(sql, inner.c.geometry)
401 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
402 log().var_dump('Result (place node)', place_address_row)
404 if place_address_row is not None:
405 return place_address_row
410 async def _lookup_area_others(self) -> Optional[SaRow]:
411 t = self.conn.t.placex
413 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
414 .where(t.c.rank_address == 0)\
415 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
416 .where(t.c.name != None)\
417 .where(t.c.indexed_status == 0)\
418 .where(t.c.linked_place_id == None)\
419 .where(self._filter_by_layer(t))\
421 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
422 .intersects(WKT_PARAM))\
423 .order_by(sa.desc(t.c.rank_search))\
427 sql = _select_from_placex(inner, False)\
428 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
429 inner.c.geometry.ST_Contains(WKT_PARAM)))\
430 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
433 sql = self._add_geometry_columns(sql, inner.c.geometry)
435 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
436 log().var_dump('Result (non-address feature)', row)
441 async def lookup_area(self) -> Optional[SaRow]:
442 """ Lookup large areas for the current search.
444 log().section('Reverse lookup by larger area features')
446 if self.layer_enabled(DataLayer.ADDRESS):
447 address_row = await self._lookup_area_address()
451 if self.has_feature_layers():
452 other_row = await self._lookup_area_others()
456 return _get_closest(address_row, other_row)
459 async def lookup_country(self) -> Optional[SaRow]:
460 """ Lookup the country for the current search.
462 log().section('Reverse lookup by country code')
463 t = self.conn.t.country_grid
464 sql = sa.select(t.c.country_code).distinct()\
465 .where(t.c.geometry.ST_Contains(WKT_PARAM))
467 ccodes = tuple((r[0] for r in await self.conn.execute(sql, self.bind_params)))
468 log().var_dump('Country codes', ccodes)
473 t = self.conn.t.placex
474 if self.max_rank > 4:
475 log().comment('Search for place nodes in country')
478 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
479 .where(t.c.osm_type == 'N')\
480 .where(t.c.rank_search > 4)\
481 .where(t.c.rank_search <= MAX_RANK_PARAM)\
482 .where(t.c.rank_address.between(5, 25))\
483 .where(t.c.name != None)\
484 .where(t.c.indexed_status == 0)\
485 .where(t.c.linked_place_id == None)\
486 .where(t.c.type != 'postcode')\
487 .where(t.c.country_code.in_(ccodes))\
489 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
490 .intersects(WKT_PARAM))\
491 .order_by(sa.desc(t.c.rank_search))\
495 sql = _select_from_placex(inner, False)\
496 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
497 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
500 sql = self._add_geometry_columns(sql, inner.c.geometry)
502 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
503 log().var_dump('Result (addressable place node)', address_row)
507 if address_row is None:
508 # Still nothing, then return a country with the appropriate country code.
509 sql = _select_from_placex(t)\
510 .where(t.c.country_code.in_(ccodes))\
511 .where(t.c.rank_address == 4)\
512 .where(t.c.rank_search == 4)\
513 .where(t.c.linked_place_id == None)\
514 .order_by('distance')\
517 sql = self._add_geometry_columns(sql, t.c.geometry)
519 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
524 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
525 """ Look up a single coordinate. Returns the place information,
526 if a place was found near the coordinates or None otherwise.
528 log().function('reverse_lookup', coord=coord, params=self.params)
531 self.bind_params['wkt'] = f'SRID=4326;POINT({coord[0]} {coord[1]})'
533 row: Optional[SaRow] = None
534 row_func: RowFunc = nres.create_from_placex_row
536 if self.max_rank >= 26:
537 row, tmp_row_func = await self.lookup_street_poi()
539 row_func = tmp_row_func
540 if row is None and self.max_rank > 4:
541 row = await self.lookup_area()
542 if row is None and self.layer_enabled(DataLayer.ADDRESS):
543 row = await self.lookup_country()
545 result = row_func(row, nres.ReverseResult)
546 if result is not None:
547 assert row is not None
548 result.distance = row.distance
549 if hasattr(row, 'bbox'):
550 result.bbox = Bbox.from_wkb(row.bbox)
551 await nres.add_result_details(self.conn, [result], self.params)