1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple
12 import sqlalchemy as sa
13 from geoalchemy2 import WKTElement
15 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow
16 from nominatim.api.connection import SearchConnection
17 import nominatim.api.results as nres
18 from nominatim.api.logging import log
19 from nominatim.api.types import AnyPoint, DataLayer, LookupDetails, GeometryFormat, Bbox
21 # In SQLAlchemy expression which compare with NULL need to be expressed with
23 # pylint: disable=singleton-comparison
25 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
27 def _select_from_placex(t: SaFromClause, wkt: Optional[str] = None) -> SaSelect:
28 """ Create a select statement with the columns relevant for reverse
32 distance = t.c.distance
33 centroid = t.c.centroid
35 distance = t.c.geometry.ST_Distance(wkt)
37 (t.c.geometry.ST_GeometryType().in_(('ST_LineString',
38 'ST_MultiLineString')),
39 t.c.geometry.ST_ClosestPoint(wkt)),
40 else_=t.c.centroid).label('centroid')
43 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
45 t.c.address, t.c.extratags,
46 t.c.housenumber, t.c.postcode, t.c.country_code,
47 t.c.importance, t.c.wikipedia,
48 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
50 distance.label('distance'),
51 t.c.geometry.ST_Expand(0).label('bbox'))
54 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
55 return sa.cast(table.c.startnumber
56 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
57 / table.c.step) * table.c.step,
58 sa.Integer).label('housenumber')
61 def _interpolated_position(table: SaFromClause) -> SaLabel:
62 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
63 rounded_pos = sa.func.round(table.c.position / fac) * fac
65 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
66 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
69 def _locate_interpolation(table: SaFromClause, wkt: WKTElement) -> SaLabel:
70 """ Given a position, locate the closest point on the line.
72 return sa.case((table.c.linegeo.ST_GeometryType() == 'ST_LineString',
73 sa.func.ST_LineLocatePoint(table.c.linegeo, wkt)),
74 else_=0).label('position')
77 def _is_address_point(table: SaFromClause) -> SaColumn:
78 return sa.and_(table.c.rank_address == 30,
79 sa.or_(table.c.housenumber != None,
80 table.c.name.has_key('housename')))
82 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
83 return min(rows, key=lambda row: 1000 if row is None else row.distance)
85 class ReverseGeocoder:
86 """ Class implementing the logic for looking up a place from a
90 def __init__(self, conn: SearchConnection, max_rank: int, layer: DataLayer,
91 details: LookupDetails) -> None:
93 self.max_rank = max_rank
95 self.details = details
97 def layer_enabled(self, *layer: DataLayer) -> bool:
98 """ Return true when any of the given layer types are requested.
100 return any(self.layer & l for l in layer)
103 def layer_disabled(self, *layer: DataLayer) -> bool:
104 """ Return true when none of the given layer types is requested.
106 return not any(self.layer & l for l in layer)
109 def has_feature_layers(self) -> bool:
110 """ Return true if any layer other than ADDRESS or POI is requested.
112 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
114 def _add_geometry_columns(self, sql: SaSelect, col: SaColumn) -> SaSelect:
115 if not self.details.geometry_output:
120 if self.details.geometry_simplification > 0.0:
121 col = col.ST_SimplifyPreserveTopology(self.details.geometry_simplification)
123 if self.details.geometry_output & GeometryFormat.GEOJSON:
124 out.append(col.ST_AsGeoJSON().label('geometry_geojson'))
125 if self.details.geometry_output & GeometryFormat.TEXT:
126 out.append(col.ST_AsText().label('geometry_text'))
127 if self.details.geometry_output & GeometryFormat.KML:
128 out.append(col.ST_AsKML().label('geometry_kml'))
129 if self.details.geometry_output & GeometryFormat.SVG:
130 out.append(col.ST_AsSVG().label('geometry_svg'))
132 return sql.add_columns(*out)
135 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
136 if self.layer_enabled(DataLayer.MANMADE):
138 if self.layer_disabled(DataLayer.RAILWAY):
139 exclude.append('railway')
140 if self.layer_disabled(DataLayer.NATURAL):
141 exclude.extend(('natural', 'water', 'waterway'))
142 return table.c.class_.not_in(tuple(exclude))
145 if self.layer_enabled(DataLayer.RAILWAY):
146 include.append('railway')
147 if self.layer_enabled(DataLayer.NATURAL):
148 include.extend(('natural', 'water', 'waterway'))
149 return table.c.class_.in_(tuple(include))
152 async def _find_closest_street_or_poi(self, wkt: WKTElement,
153 distance: float) -> Optional[SaRow]:
154 """ Look up the closest rank 26+ place in the database, which
155 is closer than the given distance.
157 t = self.conn.t.placex
159 sql = _select_from_placex(t, wkt)\
160 .where(t.c.geometry.ST_DWithin(wkt, distance))\
161 .where(t.c.indexed_status == 0)\
162 .where(t.c.linked_place_id == None)\
163 .where(sa.or_(t.c.geometry.ST_GeometryType()
164 .not_in(('ST_Polygon', 'ST_MultiPolygon')),
165 t.c.centroid.ST_Distance(wkt) < distance))\
166 .order_by('distance')\
169 sql = self._add_geometry_columns(sql, t.c.geometry)
171 restrict: List[SaColumn] = []
173 if self.layer_enabled(DataLayer.ADDRESS):
174 restrict.append(sa.and_(t.c.rank_address >= 26,
175 t.c.rank_address <= min(29, self.max_rank)))
176 if self.max_rank == 30:
177 restrict.append(_is_address_point(t))
178 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
179 restrict.append(sa.and_(t.c.rank_search == 30,
180 t.c.class_.not_in(('place', 'building')),
181 t.c.geometry.ST_GeometryType() != 'ST_LineString'))
182 if self.has_feature_layers():
183 restrict.append(sa.and_(t.c.rank_search.between(26, self.max_rank),
184 t.c.rank_address == 0,
185 self._filter_by_layer(t)))
190 return (await self.conn.execute(sql.where(sa.or_(*restrict)))).one_or_none()
193 async def _find_housenumber_for_street(self, parent_place_id: int,
194 wkt: WKTElement) -> Optional[SaRow]:
195 t = self.conn.t.placex
197 sql = _select_from_placex(t, wkt)\
198 .where(t.c.geometry.ST_DWithin(wkt, 0.001))\
199 .where(t.c.parent_place_id == parent_place_id)\
200 .where(_is_address_point(t))\
201 .where(t.c.indexed_status == 0)\
202 .where(t.c.linked_place_id == None)\
203 .order_by('distance')\
206 sql = self._add_geometry_columns(sql, t.c.geometry)
208 return (await self.conn.execute(sql)).one_or_none()
211 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
213 distance: float) -> Optional[SaRow]:
214 t = self.conn.t.osmline
217 t.c.linegeo.ST_Distance(wkt).label('distance'),
218 _locate_interpolation(t, wkt))\
219 .where(t.c.linegeo.ST_DWithin(wkt, distance))\
220 .where(t.c.startnumber != None)\
221 .order_by('distance')\
224 if parent_place_id is not None:
225 sql = sql.where(t.c.parent_place_id == parent_place_id)
227 inner = sql.subquery()
229 sql = sa.select(inner.c.place_id, inner.c.osm_id,
230 inner.c.parent_place_id, inner.c.address,
231 _interpolated_housenumber(inner),
232 _interpolated_position(inner),
233 inner.c.postcode, inner.c.country_code,
236 if self.details.geometry_output:
238 sql = self._add_geometry_columns(sql, sub.c.centroid)
240 return (await self.conn.execute(sql)).one_or_none()
243 async def _find_tiger_number_for_street(self, parent_place_id: int,
244 parent_type: str, parent_id: int,
245 wkt: WKTElement) -> Optional[SaRow]:
246 t = self.conn.t.tiger
249 t.c.linegeo.ST_Distance(wkt).label('distance'),
250 _locate_interpolation(t, wkt))\
251 .where(t.c.linegeo.ST_DWithin(wkt, 0.001))\
252 .where(t.c.parent_place_id == parent_place_id)\
253 .order_by('distance')\
257 sql = sa.select(inner.c.place_id,
258 inner.c.parent_place_id,
259 sa.literal(parent_type).label('osm_type'),
260 sa.literal(parent_id).label('osm_id'),
261 _interpolated_housenumber(inner),
262 _interpolated_position(inner),
266 if self.details.geometry_output:
268 sql = self._add_geometry_columns(sql, sub.c.centroid)
270 return (await self.conn.execute(sql)).one_or_none()
273 async def lookup_street_poi(self,
274 wkt: WKTElement) -> Tuple[Optional[SaRow], RowFunc]:
275 """ Find a street or POI/address for the given WKT point.
277 log().section('Reverse lookup on street/address level')
279 parent_place_id = None
281 row = await self._find_closest_street_or_poi(wkt, distance)
282 row_func: RowFunc = nres.create_from_placex_row
283 log().var_dump('Result (street/building)', row)
285 # If the closest result was a street, but an address was requested,
286 # check for a housenumber nearby which is part of the street.
288 if self.max_rank > 27 \
289 and self.layer_enabled(DataLayer.ADDRESS) \
290 and row.rank_address <= 27:
292 parent_place_id = row.place_id
293 log().comment('Find housenumber for street')
294 addr_row = await self._find_housenumber_for_street(parent_place_id, wkt)
295 log().var_dump('Result (street housenumber)', addr_row)
297 if addr_row is not None:
299 row_func = nres.create_from_placex_row
300 distance = addr_row.distance
301 elif row.country_code == 'us' and parent_place_id is not None:
302 log().comment('Find TIGER housenumber for street')
303 addr_row = await self._find_tiger_number_for_street(parent_place_id,
307 log().var_dump('Result (street Tiger housenumber)', addr_row)
309 if addr_row is not None:
311 row_func = nres.create_from_tiger_row
313 distance = row.distance
315 # Check for an interpolation that is either closer than our result
316 # or belongs to a close street found.
317 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
318 log().comment('Find interpolation for street')
319 addr_row = await self._find_interpolation_for_street(parent_place_id,
321 log().var_dump('Result (street interpolation)', addr_row)
322 if addr_row is not None:
324 row_func = nres.create_from_osmline_row
329 async def _lookup_area_address(self, wkt: WKTElement) -> Optional[SaRow]:
330 """ Lookup large addressable areas for the given WKT point.
332 log().comment('Reverse lookup by larger address area features')
333 t = self.conn.t.placex
335 # The inner SQL brings results in the right order, so that
336 # later only a minimum of results needs to be checked with ST_Contains.
337 inner = sa.select(t, sa.literal(0.0).label('distance'))\
338 .where(t.c.rank_search.between(5, self.max_rank))\
339 .where(t.c.rank_address.between(5, 25))\
340 .where(t.c.geometry.ST_GeometryType().in_(('ST_Polygon', 'ST_MultiPolygon')))\
341 .where(t.c.geometry.intersects(wkt))\
342 .where(t.c.name != None)\
343 .where(t.c.indexed_status == 0)\
344 .where(t.c.linked_place_id == None)\
345 .where(t.c.type != 'postcode')\
346 .order_by(sa.desc(t.c.rank_search))\
350 sql = _select_from_placex(inner)\
351 .where(inner.c.geometry.ST_Contains(wkt))\
352 .order_by(sa.desc(inner.c.rank_search))\
355 sql = self._add_geometry_columns(sql, inner.c.geometry)
357 address_row = (await self.conn.execute(sql)).one_or_none()
358 log().var_dump('Result (area)', address_row)
360 if address_row is not None and address_row.rank_search < self.max_rank:
361 log().comment('Search for better matching place nodes inside the area')
363 t.c.geometry.ST_Distance(wkt).label('distance'))\
364 .where(t.c.osm_type == 'N')\
365 .where(t.c.rank_search > address_row.rank_search)\
366 .where(t.c.rank_search <= self.max_rank)\
367 .where(t.c.rank_address.between(5, 25))\
368 .where(t.c.name != None)\
369 .where(t.c.indexed_status == 0)\
370 .where(t.c.linked_place_id == None)\
371 .where(t.c.type != 'postcode')\
373 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
375 .order_by(sa.desc(t.c.rank_search))\
379 touter = self.conn.t.placex.alias('outer')
380 sql = _select_from_placex(inner)\
381 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
382 .where(touter.c.place_id == address_row.place_id)\
383 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
384 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
387 sql = self._add_geometry_columns(sql, inner.c.geometry)
389 place_address_row = (await self.conn.execute(sql)).one_or_none()
390 log().var_dump('Result (place node)', place_address_row)
392 if place_address_row is not None:
393 return place_address_row
398 async def _lookup_area_others(self, wkt: WKTElement) -> Optional[SaRow]:
399 t = self.conn.t.placex
401 inner = sa.select(t, t.c.geometry.ST_Distance(wkt).label('distance'))\
402 .where(t.c.rank_address == 0)\
403 .where(t.c.rank_search.between(5, self.max_rank))\
404 .where(t.c.name != None)\
405 .where(t.c.indexed_status == 0)\
406 .where(t.c.linked_place_id == None)\
407 .where(self._filter_by_layer(t))\
409 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
411 .order_by(sa.desc(t.c.rank_search))\
415 sql = _select_from_placex(inner)\
416 .where(sa.or_(inner.c.geometry.ST_GeometryType()
417 .not_in(('ST_Polygon', 'ST_MultiPolygon')),
418 inner.c.geometry.ST_Contains(wkt)))\
419 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
422 sql = self._add_geometry_columns(sql, inner.c.geometry)
424 row = (await self.conn.execute(sql)).one_or_none()
425 log().var_dump('Result (non-address feature)', row)
430 async def lookup_area(self, wkt: WKTElement) -> Optional[SaRow]:
431 """ Lookup large areas for the given WKT point.
433 log().section('Reverse lookup by larger area features')
435 if self.layer_enabled(DataLayer.ADDRESS):
436 address_row = await self._lookup_area_address(wkt)
440 if self.has_feature_layers():
441 other_row = await self._lookup_area_others(wkt)
445 return _get_closest(address_row, other_row)
448 async def lookup_country(self, wkt: WKTElement) -> Optional[SaRow]:
449 """ Lookup the country for the given WKT point.
451 log().section('Reverse lookup by country code')
452 t = self.conn.t.country_grid
453 sql = sa.select(t.c.country_code).distinct()\
454 .where(t.c.geometry.ST_Contains(wkt))
456 ccodes = tuple((r[0] for r in await self.conn.execute(sql)))
457 log().var_dump('Country codes', ccodes)
462 t = self.conn.t.placex
463 if self.max_rank > 4:
464 log().comment('Search for place nodes in country')
467 t.c.geometry.ST_Distance(wkt).label('distance'))\
468 .where(t.c.osm_type == 'N')\
469 .where(t.c.rank_search > 4)\
470 .where(t.c.rank_search <= self.max_rank)\
471 .where(t.c.rank_address.between(5, 25))\
472 .where(t.c.name != None)\
473 .where(t.c.indexed_status == 0)\
474 .where(t.c.linked_place_id == None)\
475 .where(t.c.type != 'postcode')\
476 .where(t.c.country_code.in_(ccodes))\
478 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
480 .order_by(sa.desc(t.c.rank_search))\
484 sql = _select_from_placex(inner)\
485 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
486 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
489 sql = self._add_geometry_columns(sql, inner.c.geometry)
491 address_row = (await self.conn.execute(sql)).one_or_none()
492 log().var_dump('Result (addressable place node)', address_row)
496 if address_row is None:
497 # Still nothing, then return a country with the appropriate country code.
498 sql = _select_from_placex(t, wkt)\
499 .where(t.c.country_code.in_(ccodes))\
500 .where(t.c.rank_address == 4)\
501 .where(t.c.rank_search == 4)\
502 .where(t.c.linked_place_id == None)\
503 .order_by('distance')
505 sql = self._add_geometry_columns(sql, t.c.geometry)
507 address_row = (await self.conn.execute(sql)).one_or_none()
512 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
513 """ Look up a single coordinate. Returns the place information,
514 if a place was found near the coordinates or None otherwise.
516 log().function('reverse_lookup',
517 coord=coord, max_rank=self.max_rank,
518 layer=self.layer, details=self.details)
521 wkt = WKTElement(f'POINT({coord[0]} {coord[1]})', srid=4326)
523 row: Optional[SaRow] = None
524 row_func: RowFunc = nres.create_from_placex_row
526 if self.max_rank >= 26:
527 row, tmp_row_func = await self.lookup_street_poi(wkt)
529 row_func = tmp_row_func
530 if row is None and self.max_rank > 4:
531 row = await self.lookup_area(wkt)
532 if row is None and self.layer_enabled(DataLayer.ADDRESS):
533 row = await self.lookup_country(wkt)
535 result = row_func(row, nres.ReverseResult)
536 if result is not None:
537 assert row is not None
538 result.distance = row.distance
539 if hasattr(row, 'bbox'):
540 result.bbox = Bbox.from_wkb(row.bbox.data)
541 await nres.add_result_details(self.conn, result, self.details)