1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any
12 import sqlalchemy as sa
14 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow, SaBind
15 from nominatim.api.connection import SearchConnection
16 import nominatim.api.results as nres
17 from nominatim.api.logging import log
18 from nominatim.api.types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
19 from nominatim.db.sqlalchemy_types import Geometry
20 import nominatim.db.sqlalchemy_functions as snfn
22 # In SQLAlchemy expression which compare with NULL need to be expressed with
24 # pylint: disable=singleton-comparison
26 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
28 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
29 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
31 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
32 """ Create a select statement with the columns relevant for reverse
36 distance = t.c.distance
37 centroid = t.c.centroid
39 distance = t.c.geometry.ST_Distance(WKT_PARAM)
40 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
41 else_=t.c.centroid).label('centroid')
44 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
46 t.c.address, t.c.extratags,
47 t.c.housenumber, t.c.postcode, t.c.country_code,
48 t.c.importance, t.c.wikipedia,
49 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
51 distance.label('distance'),
52 t.c.geometry.ST_Expand(0).label('bbox'))
55 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
56 return sa.cast(table.c.startnumber
57 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
58 / table.c.step) * table.c.step,
59 sa.Integer).label('housenumber')
62 def _interpolated_position(table: SaFromClause) -> SaLabel:
63 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
64 rounded_pos = sa.func.round(table.c.position / fac) * fac
66 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
67 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
70 def _locate_interpolation(table: SaFromClause) -> SaLabel:
71 """ Given a position, locate the closest point on the line.
73 return sa.case((table.c.linegeo.is_line_like(),
74 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
75 else_=0).label('position')
78 def _is_address_point(table: SaFromClause) -> SaColumn:
79 return sa.and_(table.c.rank_address == 30,
80 sa.or_(table.c.housenumber != None,
81 table.c.name.has_key('housename')))
84 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
85 return min(rows, key=lambda row: 1000 if row is None else row.distance)
88 class ReverseGeocoder:
89 """ Class implementing the logic for looking up a place from a
93 def __init__(self, conn: SearchConnection, params: ReverseDetails) -> None:
97 self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
101 def max_rank(self) -> int:
102 """ Return the maximum configured rank.
104 return self.params.max_rank
107 def has_geometries(self) -> bool:
108 """ Check if any geometries are requested.
110 return bool(self.params.geometry_output)
113 def layer_enabled(self, *layer: DataLayer) -> bool:
114 """ Return true when any of the given layer types are requested.
116 return any(self.params.layers & l for l in layer)
119 def layer_disabled(self, *layer: DataLayer) -> bool:
120 """ Return true when none of the given layer types is requested.
122 return not any(self.params.layers & l for l in layer)
125 def has_feature_layers(self) -> bool:
126 """ Return true if any layer other than ADDRESS or POI is requested.
128 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
131 def _add_geometry_columns(self, sql: SaSelect, col: SaColumn) -> SaSelect:
132 if not self.has_geometries():
137 if self.params.geometry_simplification > 0.0:
138 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
140 if self.params.geometry_output & GeometryFormat.GEOJSON:
141 out.append(sa.func.ST_AsGeoJSON(col).label('geometry_geojson'))
142 if self.params.geometry_output & GeometryFormat.TEXT:
143 out.append(sa.func.ST_AsText(col).label('geometry_text'))
144 if self.params.geometry_output & GeometryFormat.KML:
145 out.append(sa.func.ST_AsKML(col).label('geometry_kml'))
146 if self.params.geometry_output & GeometryFormat.SVG:
147 out.append(sa.func.ST_AsSVG(col).label('geometry_svg'))
149 return sql.add_columns(*out)
152 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
153 if self.layer_enabled(DataLayer.MANMADE):
155 if self.layer_disabled(DataLayer.RAILWAY):
156 exclude.append('railway')
157 if self.layer_disabled(DataLayer.NATURAL):
158 exclude.extend(('natural', 'water', 'waterway'))
159 return table.c.class_.not_in(tuple(exclude))
162 if self.layer_enabled(DataLayer.RAILWAY):
163 include.append('railway')
164 if self.layer_enabled(DataLayer.NATURAL):
165 include.extend(('natural', 'water', 'waterway'))
166 return table.c.class_.in_(tuple(include))
169 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
170 """ Look up the closest rank 26+ place in the database, which
171 is closer than the given distance.
173 t = self.conn.t.placex
175 sql = _select_from_placex(t)\
176 .where(t.c.geometry.ST_DWithin(WKT_PARAM, distance))\
177 .where(t.c.indexed_status == 0)\
178 .where(t.c.linked_place_id == None)\
179 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
180 t.c.centroid.ST_Distance(WKT_PARAM) < distance))\
181 .order_by('distance')\
184 sql = self._add_geometry_columns(sql, t.c.geometry)
186 restrict: List[SaColumn] = []
188 if self.layer_enabled(DataLayer.ADDRESS):
189 restrict.append(sa.and_(t.c.rank_address >= 26,
190 t.c.rank_address <= min(29, self.max_rank)))
191 if self.max_rank == 30:
192 restrict.append(_is_address_point(t))
193 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
194 restrict.append(sa.and_(t.c.rank_search == 30,
195 t.c.class_.not_in(('place', 'building')),
196 sa.not_(t.c.geometry.is_line_like())))
197 if self.has_feature_layers():
198 restrict.append(sa.and_(t.c.rank_search.between(26, MAX_RANK_PARAM),
199 t.c.rank_address == 0,
200 self._filter_by_layer(t)))
205 sql = sql.where(sa.or_(*restrict))
207 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
210 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
211 t = self.conn.t.placex
213 sql = _select_from_placex(t)\
214 .where(t.c.geometry.ST_DWithin(WKT_PARAM, 0.001))\
215 .where(t.c.parent_place_id == parent_place_id)\
216 .where(_is_address_point(t))\
217 .where(t.c.indexed_status == 0)\
218 .where(t.c.linked_place_id == None)\
219 .order_by('distance')\
222 sql = self._add_geometry_columns(sql, t.c.geometry)
224 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
227 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
228 distance: float) -> Optional[SaRow]:
229 t = self.conn.t.osmline
232 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
233 _locate_interpolation(t))\
234 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, distance))\
235 .where(t.c.startnumber != None)\
236 .order_by('distance')\
239 if parent_place_id is not None:
240 sql = sql.where(t.c.parent_place_id == parent_place_id)
242 inner = sql.subquery('ipol')
244 sql = sa.select(inner.c.place_id, inner.c.osm_id,
245 inner.c.parent_place_id, inner.c.address,
246 _interpolated_housenumber(inner),
247 _interpolated_position(inner),
248 inner.c.postcode, inner.c.country_code,
251 if self.has_geometries():
252 sub = sql.subquery('geom')
253 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
255 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
258 async def _find_tiger_number_for_street(self, parent_place_id: int,
260 parent_id: int) -> Optional[SaRow]:
261 t = self.conn.t.tiger
264 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
265 _locate_interpolation(t))\
266 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, 0.001))\
267 .where(t.c.parent_place_id == parent_place_id)\
268 .order_by('distance')\
272 sql = sa.select(inner.c.place_id,
273 inner.c.parent_place_id,
274 sa.literal(parent_type).label('osm_type'),
275 sa.literal(parent_id).label('osm_id'),
276 _interpolated_housenumber(inner),
277 _interpolated_position(inner),
281 if self.has_geometries():
282 sub = sql.subquery('geom')
283 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
285 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
288 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
289 """ Find a street or POI/address for the given WKT point.
291 log().section('Reverse lookup on street/address level')
293 parent_place_id = None
295 row = await self._find_closest_street_or_poi(distance)
296 row_func: RowFunc = nres.create_from_placex_row
297 log().var_dump('Result (street/building)', row)
299 # If the closest result was a street, but an address was requested,
300 # check for a housenumber nearby which is part of the street.
302 if self.max_rank > 27 \
303 and self.layer_enabled(DataLayer.ADDRESS) \
304 and row.rank_address <= 27:
306 parent_place_id = row.place_id
307 log().comment('Find housenumber for street')
308 addr_row = await self._find_housenumber_for_street(parent_place_id)
309 log().var_dump('Result (street housenumber)', addr_row)
311 if addr_row is not None:
313 row_func = nres.create_from_placex_row
314 distance = addr_row.distance
315 elif row.country_code == 'us' and parent_place_id is not None:
316 log().comment('Find TIGER housenumber for street')
317 addr_row = await self._find_tiger_number_for_street(parent_place_id,
320 log().var_dump('Result (street Tiger housenumber)', addr_row)
322 if addr_row is not None:
324 row_func = nres.create_from_tiger_row
326 distance = row.distance
328 # Check for an interpolation that is either closer than our result
329 # or belongs to a close street found.
330 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
331 log().comment('Find interpolation for street')
332 addr_row = await self._find_interpolation_for_street(parent_place_id,
334 log().var_dump('Result (street interpolation)', addr_row)
335 if addr_row is not None:
337 row_func = nres.create_from_osmline_row
342 async def _lookup_area_address(self) -> Optional[SaRow]:
343 """ Lookup large addressable areas for the given WKT point.
345 log().comment('Reverse lookup by larger address area features')
346 t = self.conn.t.placex
348 # The inner SQL brings results in the right order, so that
349 # later only a minimum of results needs to be checked with ST_Contains.
350 inner = sa.select(t, sa.literal(0.0).label('distance'))\
351 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
352 .where(t.c.geometry.intersects(WKT_PARAM))\
353 .where(snfn.select_index_placex_geometry_reverse_lookuppolygon('placex'))\
354 .order_by(sa.desc(t.c.rank_search))\
358 sql = _select_from_placex(inner, False)\
359 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
360 .order_by(sa.desc(inner.c.rank_search))\
363 sql = self._add_geometry_columns(sql, inner.c.geometry)
365 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
366 log().var_dump('Result (area)', address_row)
368 if address_row is not None and address_row.rank_search < self.max_rank:
369 log().comment('Search for better matching place nodes inside the area')
371 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
372 .where(t.c.rank_search > address_row.rank_search)\
373 .where(t.c.rank_search <= MAX_RANK_PARAM)\
374 .where(t.c.indexed_status == 0)\
375 .where(snfn.select_index_placex_geometry_reverse_lookupplacenode('placex'))\
377 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
378 .intersects(WKT_PARAM))\
379 .order_by(sa.desc(t.c.rank_search))\
383 touter = self.conn.t.placex.alias('outer')
384 sql = _select_from_placex(inner, False)\
385 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
386 .where(touter.c.place_id == address_row.place_id)\
387 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
388 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
391 sql = self._add_geometry_columns(sql, inner.c.geometry)
393 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
394 log().var_dump('Result (place node)', place_address_row)
396 if place_address_row is not None:
397 return place_address_row
402 async def _lookup_area_others(self) -> Optional[SaRow]:
403 t = self.conn.t.placex
405 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
406 .where(t.c.rank_address == 0)\
407 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
408 .where(t.c.name != None)\
409 .where(t.c.indexed_status == 0)\
410 .where(t.c.linked_place_id == None)\
411 .where(self._filter_by_layer(t))\
413 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
414 .intersects(WKT_PARAM))\
415 .order_by(sa.desc(t.c.rank_search))\
419 sql = _select_from_placex(inner, False)\
420 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
421 inner.c.geometry.ST_Contains(WKT_PARAM)))\
422 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
425 sql = self._add_geometry_columns(sql, inner.c.geometry)
427 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
428 log().var_dump('Result (non-address feature)', row)
433 async def lookup_area(self) -> Optional[SaRow]:
434 """ Lookup large areas for the current search.
436 log().section('Reverse lookup by larger area features')
438 if self.layer_enabled(DataLayer.ADDRESS):
439 address_row = await self._lookup_area_address()
443 if self.has_feature_layers():
444 other_row = await self._lookup_area_others()
448 return _get_closest(address_row, other_row)
451 async def lookup_country(self) -> Optional[SaRow]:
452 """ Lookup the country for the current search.
454 log().section('Reverse lookup by country code')
455 t = self.conn.t.country_grid
456 sql = sa.select(t.c.country_code).distinct()\
457 .where(t.c.geometry.ST_Contains(WKT_PARAM))
459 ccodes = tuple((r[0] for r in await self.conn.execute(sql, self.bind_params)))
460 log().var_dump('Country codes', ccodes)
465 t = self.conn.t.placex
466 if self.max_rank > 4:
467 log().comment('Search for place nodes in country')
470 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
471 .where(t.c.rank_search > 4)\
472 .where(t.c.rank_search <= MAX_RANK_PARAM)\
473 .where(t.c.indexed_status == 0)\
474 .where(t.c.country_code.in_(ccodes))\
475 .where(snfn.select_index_placex_geometry_reverse_lookupplacenode('placex'))\
477 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
478 .intersects(WKT_PARAM))\
479 .order_by(sa.desc(t.c.rank_search))\
483 sql = _select_from_placex(inner, False)\
484 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
485 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
488 sql = self._add_geometry_columns(sql, inner.c.geometry)
490 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
491 log().var_dump('Result (addressable place node)', address_row)
495 if address_row is None:
496 # Still nothing, then return a country with the appropriate country code.
497 sql = _select_from_placex(t)\
498 .where(t.c.country_code.in_(ccodes))\
499 .where(t.c.rank_address == 4)\
500 .where(t.c.rank_search == 4)\
501 .where(t.c.linked_place_id == None)\
502 .order_by('distance')\
505 sql = self._add_geometry_columns(sql, t.c.geometry)
507 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
512 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
513 """ Look up a single coordinate. Returns the place information,
514 if a place was found near the coordinates or None otherwise.
516 log().function('reverse_lookup', coord=coord, params=self.params)
519 self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
521 row: Optional[SaRow] = None
522 row_func: RowFunc = nres.create_from_placex_row
524 if self.max_rank >= 26:
525 row, tmp_row_func = await self.lookup_street_poi()
527 row_func = tmp_row_func
528 if row is None and self.max_rank > 4:
529 row = await self.lookup_area()
530 if row is None and self.layer_enabled(DataLayer.ADDRESS):
531 row = await self.lookup_country()
533 result = row_func(row, nres.ReverseResult)
534 if result is not None:
535 assert row is not None
536 result.distance = row.distance
537 if hasattr(row, 'bbox'):
538 result.bbox = Bbox.from_wkb(row.bbox)
539 await nres.add_result_details(self.conn, [result], self.params)