1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple
12 import sqlalchemy as sa
14 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow
15 from nominatim.api.connection import SearchConnection
16 import nominatim.api.results as nres
17 from nominatim.api.logging import log
18 from nominatim.api.types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
20 # In SQLAlchemy expression which compare with NULL need to be expressed with
22 # pylint: disable=singleton-comparison
24 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
26 def _select_from_placex(t: SaFromClause, wkt: Optional[str] = None) -> SaSelect:
27 """ Create a select statement with the columns relevant for reverse
31 distance = t.c.distance
32 centroid = t.c.centroid
34 distance = t.c.geometry.ST_Distance(wkt)
35 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(wkt)),
36 else_=t.c.centroid).label('centroid')
39 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
41 t.c.address, t.c.extratags,
42 t.c.housenumber, t.c.postcode, t.c.country_code,
43 t.c.importance, t.c.wikipedia,
44 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
46 distance.label('distance'),
47 t.c.geometry.ST_Expand(0).label('bbox'))
50 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
51 return sa.cast(table.c.startnumber
52 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
53 / table.c.step) * table.c.step,
54 sa.Integer).label('housenumber')
57 def _interpolated_position(table: SaFromClause) -> SaLabel:
58 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
59 rounded_pos = sa.func.round(table.c.position / fac) * fac
61 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
62 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
65 def _locate_interpolation(table: SaFromClause, wkt: str) -> SaLabel:
66 """ Given a position, locate the closest point on the line.
68 return sa.case((table.c.linegeo.is_line_like(), table.c.linegeo.ST_LineLocatePoint(wkt)),
69 else_=0).label('position')
72 def _is_address_point(table: SaFromClause) -> SaColumn:
73 return sa.and_(table.c.rank_address == 30,
74 sa.or_(table.c.housenumber != None,
75 table.c.name.has_key('housename')))
77 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
78 return min(rows, key=lambda row: 1000 if row is None else row.distance)
80 class ReverseGeocoder:
81 """ Class implementing the logic for looking up a place from a
85 def __init__(self, conn: SearchConnection, params: ReverseDetails) -> None:
91 def max_rank(self) -> int:
92 """ Return the maximum configured rank.
94 return self.params.max_rank
97 def has_geometries(self) -> bool:
98 """ Check if any geometries are requested.
100 return bool(self.params.geometry_output)
103 def layer_enabled(self, *layer: DataLayer) -> bool:
104 """ Return true when any of the given layer types are requested.
106 return any(self.params.layers & l for l in layer)
109 def layer_disabled(self, *layer: DataLayer) -> bool:
110 """ Return true when none of the given layer types is requested.
112 return not any(self.params.layers & l for l in layer)
115 def has_feature_layers(self) -> bool:
116 """ Return true if any layer other than ADDRESS or POI is requested.
118 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
120 def _add_geometry_columns(self, sql: SaSelect, col: SaColumn) -> SaSelect:
121 if not self.has_geometries():
126 if self.params.geometry_simplification > 0.0:
127 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
129 if self.params.geometry_output & GeometryFormat.GEOJSON:
130 out.append(sa.func.ST_AsGeoJSON(col).label('geometry_geojson'))
131 if self.params.geometry_output & GeometryFormat.TEXT:
132 out.append(sa.func.ST_AsText(col).label('geometry_text'))
133 if self.params.geometry_output & GeometryFormat.KML:
134 out.append(sa.func.ST_AsKML(col).label('geometry_kml'))
135 if self.params.geometry_output & GeometryFormat.SVG:
136 out.append(sa.func.ST_AsSVG(col).label('geometry_svg'))
138 return sql.add_columns(*out)
141 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
142 if self.layer_enabled(DataLayer.MANMADE):
144 if self.layer_disabled(DataLayer.RAILWAY):
145 exclude.append('railway')
146 if self.layer_disabled(DataLayer.NATURAL):
147 exclude.extend(('natural', 'water', 'waterway'))
148 return table.c.class_.not_in(tuple(exclude))
151 if self.layer_enabled(DataLayer.RAILWAY):
152 include.append('railway')
153 if self.layer_enabled(DataLayer.NATURAL):
154 include.extend(('natural', 'water', 'waterway'))
155 return table.c.class_.in_(tuple(include))
158 async def _find_closest_street_or_poi(self, wkt: str,
159 distance: float) -> Optional[SaRow]:
160 """ Look up the closest rank 26+ place in the database, which
161 is closer than the given distance.
163 t = self.conn.t.placex
165 sql = _select_from_placex(t, wkt)\
166 .where(t.c.geometry.ST_DWithin(wkt, distance))\
167 .where(t.c.indexed_status == 0)\
168 .where(t.c.linked_place_id == None)\
169 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
170 t.c.centroid.ST_Distance(wkt) < distance))\
171 .order_by('distance')\
174 sql = self._add_geometry_columns(sql, t.c.geometry)
176 restrict: List[SaColumn] = []
178 if self.layer_enabled(DataLayer.ADDRESS):
179 restrict.append(sa.and_(t.c.rank_address >= 26,
180 t.c.rank_address <= min(29, self.max_rank)))
181 if self.max_rank == 30:
182 restrict.append(_is_address_point(t))
183 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
184 restrict.append(sa.and_(t.c.rank_search == 30,
185 t.c.class_.not_in(('place', 'building')),
186 sa.not_(t.c.geometry.is_line_like())))
187 if self.has_feature_layers():
188 restrict.append(sa.and_(t.c.rank_search.between(26, self.max_rank),
189 t.c.rank_address == 0,
190 self._filter_by_layer(t)))
195 return (await self.conn.execute(sql.where(sa.or_(*restrict)))).one_or_none()
198 async def _find_housenumber_for_street(self, parent_place_id: int,
199 wkt: str) -> Optional[SaRow]:
200 t = self.conn.t.placex
202 sql = _select_from_placex(t, wkt)\
203 .where(t.c.geometry.ST_DWithin(wkt, 0.001))\
204 .where(t.c.parent_place_id == parent_place_id)\
205 .where(_is_address_point(t))\
206 .where(t.c.indexed_status == 0)\
207 .where(t.c.linked_place_id == None)\
208 .order_by('distance')\
211 sql = self._add_geometry_columns(sql, t.c.geometry)
213 return (await self.conn.execute(sql)).one_or_none()
216 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
218 distance: float) -> Optional[SaRow]:
219 t = self.conn.t.osmline
222 t.c.linegeo.ST_Distance(wkt).label('distance'),
223 _locate_interpolation(t, wkt))\
224 .where(t.c.linegeo.ST_DWithin(wkt, distance))\
225 .where(t.c.startnumber != None)\
226 .order_by('distance')\
229 if parent_place_id is not None:
230 sql = sql.where(t.c.parent_place_id == parent_place_id)
232 inner = sql.subquery('ipol')
234 sql = sa.select(inner.c.place_id, inner.c.osm_id,
235 inner.c.parent_place_id, inner.c.address,
236 _interpolated_housenumber(inner),
237 _interpolated_position(inner),
238 inner.c.postcode, inner.c.country_code,
241 if self.has_geometries():
242 sub = sql.subquery('geom')
243 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
245 return (await self.conn.execute(sql)).one_or_none()
248 async def _find_tiger_number_for_street(self, parent_place_id: int,
249 parent_type: str, parent_id: int,
250 wkt: str) -> Optional[SaRow]:
251 t = self.conn.t.tiger
254 t.c.linegeo.ST_Distance(wkt).label('distance'),
255 _locate_interpolation(t, wkt))\
256 .where(t.c.linegeo.ST_DWithin(wkt, 0.001))\
257 .where(t.c.parent_place_id == parent_place_id)\
258 .order_by('distance')\
262 sql = sa.select(inner.c.place_id,
263 inner.c.parent_place_id,
264 sa.literal(parent_type).label('osm_type'),
265 sa.literal(parent_id).label('osm_id'),
266 _interpolated_housenumber(inner),
267 _interpolated_position(inner),
271 if self.has_geometries():
272 sub = sql.subquery('geom')
273 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
275 return (await self.conn.execute(sql)).one_or_none()
278 async def lookup_street_poi(self,
279 wkt: str) -> Tuple[Optional[SaRow], RowFunc]:
280 """ Find a street or POI/address for the given WKT point.
282 log().section('Reverse lookup on street/address level')
284 parent_place_id = None
286 row = await self._find_closest_street_or_poi(wkt, distance)
287 row_func: RowFunc = nres.create_from_placex_row
288 log().var_dump('Result (street/building)', row)
290 # If the closest result was a street, but an address was requested,
291 # check for a housenumber nearby which is part of the street.
293 if self.max_rank > 27 \
294 and self.layer_enabled(DataLayer.ADDRESS) \
295 and row.rank_address <= 27:
297 parent_place_id = row.place_id
298 log().comment('Find housenumber for street')
299 addr_row = await self._find_housenumber_for_street(parent_place_id, wkt)
300 log().var_dump('Result (street housenumber)', addr_row)
302 if addr_row is not None:
304 row_func = nres.create_from_placex_row
305 distance = addr_row.distance
306 elif row.country_code == 'us' and parent_place_id is not None:
307 log().comment('Find TIGER housenumber for street')
308 addr_row = await self._find_tiger_number_for_street(parent_place_id,
312 log().var_dump('Result (street Tiger housenumber)', addr_row)
314 if addr_row is not None:
316 row_func = nres.create_from_tiger_row
318 distance = row.distance
320 # Check for an interpolation that is either closer than our result
321 # or belongs to a close street found.
322 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
323 log().comment('Find interpolation for street')
324 addr_row = await self._find_interpolation_for_street(parent_place_id,
326 log().var_dump('Result (street interpolation)', addr_row)
327 if addr_row is not None:
329 row_func = nres.create_from_osmline_row
334 async def _lookup_area_address(self, wkt: str) -> Optional[SaRow]:
335 """ Lookup large addressable areas for the given WKT point.
337 log().comment('Reverse lookup by larger address area features')
338 t = self.conn.t.placex
340 # The inner SQL brings results in the right order, so that
341 # later only a minimum of results needs to be checked with ST_Contains.
342 inner = sa.select(t, sa.literal(0.0).label('distance'))\
343 .where(t.c.rank_search.between(5, self.max_rank))\
344 .where(t.c.rank_address.between(5, 25))\
345 .where(t.c.geometry.is_area())\
346 .where(t.c.geometry.intersects(wkt))\
347 .where(t.c.name != None)\
348 .where(t.c.indexed_status == 0)\
349 .where(t.c.linked_place_id == None)\
350 .where(t.c.type != 'postcode')\
351 .order_by(sa.desc(t.c.rank_search))\
355 sql = _select_from_placex(inner)\
356 .where(inner.c.geometry.ST_Contains(wkt))\
357 .order_by(sa.desc(inner.c.rank_search))\
360 sql = self._add_geometry_columns(sql, inner.c.geometry)
362 address_row = (await self.conn.execute(sql)).one_or_none()
363 log().var_dump('Result (area)', address_row)
365 if address_row is not None and address_row.rank_search < self.max_rank:
366 log().comment('Search for better matching place nodes inside the area')
368 t.c.geometry.ST_Distance(wkt).label('distance'))\
369 .where(t.c.osm_type == 'N')\
370 .where(t.c.rank_search > address_row.rank_search)\
371 .where(t.c.rank_search <= self.max_rank)\
372 .where(t.c.rank_address.between(5, 25))\
373 .where(t.c.name != None)\
374 .where(t.c.indexed_status == 0)\
375 .where(t.c.linked_place_id == None)\
376 .where(t.c.type != 'postcode')\
378 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
380 .order_by(sa.desc(t.c.rank_search))\
384 touter = self.conn.t.placex.alias('outer')
385 sql = _select_from_placex(inner)\
386 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
387 .where(touter.c.place_id == address_row.place_id)\
388 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
389 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
392 sql = self._add_geometry_columns(sql, inner.c.geometry)
394 place_address_row = (await self.conn.execute(sql)).one_or_none()
395 log().var_dump('Result (place node)', place_address_row)
397 if place_address_row is not None:
398 return place_address_row
403 async def _lookup_area_others(self, wkt: str) -> Optional[SaRow]:
404 t = self.conn.t.placex
406 inner = sa.select(t, t.c.geometry.ST_Distance(wkt).label('distance'))\
407 .where(t.c.rank_address == 0)\
408 .where(t.c.rank_search.between(5, self.max_rank))\
409 .where(t.c.name != None)\
410 .where(t.c.indexed_status == 0)\
411 .where(t.c.linked_place_id == None)\
412 .where(self._filter_by_layer(t))\
414 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
416 .order_by(sa.desc(t.c.rank_search))\
420 sql = _select_from_placex(inner)\
421 .where(sa.or_(not inner.c.geometry.is_area(),
422 inner.c.geometry.ST_Contains(wkt)))\
423 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
426 sql = self._add_geometry_columns(sql, inner.c.geometry)
428 row = (await self.conn.execute(sql)).one_or_none()
429 log().var_dump('Result (non-address feature)', row)
434 async def lookup_area(self, wkt: str) -> Optional[SaRow]:
435 """ Lookup large areas for the given WKT point.
437 log().section('Reverse lookup by larger area features')
439 if self.layer_enabled(DataLayer.ADDRESS):
440 address_row = await self._lookup_area_address(wkt)
444 if self.has_feature_layers():
445 other_row = await self._lookup_area_others(wkt)
449 return _get_closest(address_row, other_row)
452 async def lookup_country(self, wkt: str) -> Optional[SaRow]:
453 """ Lookup the country for the given WKT point.
455 log().section('Reverse lookup by country code')
456 t = self.conn.t.country_grid
457 sql = sa.select(t.c.country_code).distinct()\
458 .where(t.c.geometry.ST_Contains(wkt))
460 ccodes = tuple((r[0] for r in await self.conn.execute(sql)))
461 log().var_dump('Country codes', ccodes)
466 t = self.conn.t.placex
467 if self.max_rank > 4:
468 log().comment('Search for place nodes in country')
471 t.c.geometry.ST_Distance(wkt).label('distance'))\
472 .where(t.c.osm_type == 'N')\
473 .where(t.c.rank_search > 4)\
474 .where(t.c.rank_search <= self.max_rank)\
475 .where(t.c.rank_address.between(5, 25))\
476 .where(t.c.name != None)\
477 .where(t.c.indexed_status == 0)\
478 .where(t.c.linked_place_id == None)\
479 .where(t.c.type != 'postcode')\
480 .where(t.c.country_code.in_(ccodes))\
482 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
484 .order_by(sa.desc(t.c.rank_search))\
488 sql = _select_from_placex(inner)\
489 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
490 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
493 sql = self._add_geometry_columns(sql, inner.c.geometry)
495 address_row = (await self.conn.execute(sql)).one_or_none()
496 log().var_dump('Result (addressable place node)', address_row)
500 if address_row is None:
501 # Still nothing, then return a country with the appropriate country code.
502 sql = _select_from_placex(t, wkt)\
503 .where(t.c.country_code.in_(ccodes))\
504 .where(t.c.rank_address == 4)\
505 .where(t.c.rank_search == 4)\
506 .where(t.c.linked_place_id == None)\
507 .order_by('distance')\
510 sql = self._add_geometry_columns(sql, t.c.geometry)
512 address_row = (await self.conn.execute(sql)).one_or_none()
517 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
518 """ Look up a single coordinate. Returns the place information,
519 if a place was found near the coordinates or None otherwise.
521 log().function('reverse_lookup', coord=coord, params=self.params)
524 wkt = f'POINT({coord[0]} {coord[1]})'
526 row: Optional[SaRow] = None
527 row_func: RowFunc = nres.create_from_placex_row
529 if self.max_rank >= 26:
530 row, tmp_row_func = await self.lookup_street_poi(wkt)
532 row_func = tmp_row_func
533 if row is None and self.max_rank > 4:
534 row = await self.lookup_area(wkt)
535 if row is None and self.layer_enabled(DataLayer.ADDRESS):
536 row = await self.lookup_country(wkt)
538 result = row_func(row, nres.ReverseResult)
539 if result is not None:
540 assert row is not None
541 result.distance = row.distance
542 if hasattr(row, 'bbox'):
543 result.bbox = Bbox.from_wkb(row.bbox.data)
544 await nres.add_result_details(self.conn, [result], self.params)