]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/reverse.py
switch reverse CLI command to Python implementation
[nominatim.git] / nominatim / api / reverse.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of reverse geocoding.
9 """
10 from typing import Optional, List, Callable, Type, Tuple
11
12 import sqlalchemy as sa
13 from geoalchemy2 import WKTElement
14
15 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow
16 from nominatim.api.connection import SearchConnection
17 import nominatim.api.results as nres
18 from nominatim.api.logging import log
19 from nominatim.api.types import AnyPoint, DataLayer, LookupDetails, GeometryFormat, Bbox
20
21 # In SQLAlchemy expression which compare with NULL need to be expressed with
22 # the equal sign.
23 # pylint: disable=singleton-comparison
24
25 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
26
27 def _select_from_placex(t: SaFromClause, wkt: Optional[str] = None) -> SaSelect:
28     """ Create a select statement with the columns relevant for reverse
29         results.
30     """
31     if wkt is None:
32         distance = t.c.distance
33         centroid = t.c.centroid
34     else:
35         distance = t.c.geometry.ST_Distance(wkt)
36         centroid = sa.case(
37                        (t.c.geometry.ST_GeometryType().in_(('ST_LineString',
38                                                            'ST_MultiLineString')),
39                         t.c.geometry.ST_ClosestPoint(wkt)),
40                        else_=t.c.centroid).label('centroid')
41
42
43     return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
44                      t.c.class_, t.c.type,
45                      t.c.address, t.c.extratags,
46                      t.c.housenumber, t.c.postcode, t.c.country_code,
47                      t.c.importance, t.c.wikipedia,
48                      t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
49                      centroid,
50                      distance.label('distance'),
51                      t.c.geometry.ST_Expand(0).label('bbox'))
52
53
54 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
55     return sa.cast(table.c.startnumber
56                     + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
57                                     / table.c.step) * table.c.step,
58                    sa.Integer).label('housenumber')
59
60
61 def _interpolated_position(table: SaFromClause) -> SaLabel:
62     fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
63     rounded_pos = sa.func.round(table.c.position / fac) * fac
64     return sa.case(
65              (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
66               else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
67
68
69 def _is_address_point(table: SaFromClause) -> SaColumn:
70     return sa.and_(table.c.rank_address == 30,
71                    sa.or_(table.c.housenumber != None,
72                           table.c.name.has_key('housename')))
73
74 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
75     return min(rows, key=lambda row: 1000 if row is None else row.distance)
76
77 class ReverseGeocoder:
78     """ Class implementing the logic for looking up a place from a
79         coordinate.
80     """
81
82     def __init__(self, conn: SearchConnection, max_rank: int, layer: DataLayer,
83                  details: LookupDetails) -> None:
84         self.conn = conn
85         self.max_rank = max_rank
86         self.layer = layer
87         self.details = details
88
89     def layer_enabled(self, *layer: DataLayer) -> bool:
90         """ Return true when any of the given layer types are requested.
91         """
92         return any(self.layer & l for l in layer)
93
94
95     def layer_disabled(self, *layer: DataLayer) -> bool:
96         """ Return true when none of the given layer types is requested.
97         """
98         return not any(self.layer & l for l in layer)
99
100
101     def has_feature_layers(self) -> bool:
102         """ Return true if any layer other than ADDRESS or POI is requested.
103         """
104         return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
105
106     def _add_geometry_columns(self, sql: SaSelect, col: SaColumn) -> SaSelect:
107         if not self.details.geometry_output:
108             return sql
109
110         out = []
111
112         if self.details.geometry_simplification > 0.0:
113             col = col.ST_SimplifyPreserveTopology(self.details.geometry_simplification)
114
115         if self.details.geometry_output & GeometryFormat.GEOJSON:
116             out.append(col.ST_AsGeoJSON().label('geometry_geojson'))
117         if self.details.geometry_output & GeometryFormat.TEXT:
118             out.append(col.ST_AsText().label('geometry_text'))
119         if self.details.geometry_output & GeometryFormat.KML:
120             out.append(col.ST_AsKML().label('geometry_kml'))
121         if self.details.geometry_output & GeometryFormat.SVG:
122             out.append(col.ST_AsSVG().label('geometry_svg'))
123
124         return sql.add_columns(*out)
125
126
127     def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
128         if self.layer_enabled(DataLayer.MANMADE):
129             exclude = []
130             if self.layer_disabled(DataLayer.RAILWAY):
131                 exclude.append('railway')
132             if self.layer_disabled(DataLayer.NATURAL):
133                 exclude.extend(('natural', 'water', 'waterway'))
134             return table.c.class_.not_in(tuple(exclude))
135
136         include = []
137         if self.layer_enabled(DataLayer.RAILWAY):
138             include.append('railway')
139         if self.layer_enabled(DataLayer.NATURAL):
140             include.extend(('natural', 'water', 'waterway'))
141         return table.c.class_.in_(tuple(include))
142
143
144     async def _find_closest_street_or_poi(self, wkt: WKTElement,
145                                           distance: float) -> Optional[SaRow]:
146         """ Look up the closest rank 26+ place in the database, which
147             is closer than the given distance.
148         """
149         t = self.conn.t.placex
150
151         sql = _select_from_placex(t, wkt)\
152                 .where(t.c.geometry.ST_DWithin(wkt, distance))\
153                 .where(t.c.indexed_status == 0)\
154                 .where(t.c.linked_place_id == None)\
155                 .where(sa.or_(t.c.geometry.ST_GeometryType()
156                                           .not_in(('ST_Polygon', 'ST_MultiPolygon')),
157                               t.c.centroid.ST_Distance(wkt) < distance))\
158                 .order_by('distance')\
159                 .limit(1)
160
161         sql = self._add_geometry_columns(sql, t.c.geometry)
162
163         restrict: List[SaColumn] = []
164
165         if self.layer_enabled(DataLayer.ADDRESS):
166             restrict.append(sa.and_(t.c.rank_address >= 26,
167                                     t.c.rank_address <= min(29, self.max_rank)))
168             if self.max_rank == 30:
169                 restrict.append(_is_address_point(t))
170         if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
171             restrict.append(sa.and_(t.c.rank_search == 30,
172                                     t.c.class_.not_in(('place', 'building')),
173                                     t.c.geometry.ST_GeometryType() != 'ST_LineString'))
174         if self.has_feature_layers():
175             restrict.append(sa.and_(t.c.rank_search.between(26, self.max_rank),
176                                     t.c.rank_address == 0,
177                                     self._filter_by_layer(t)))
178
179         if not restrict:
180             return None
181
182         return (await self.conn.execute(sql.where(sa.or_(*restrict)))).one_or_none()
183
184
185     async def _find_housenumber_for_street(self, parent_place_id: int,
186                                            wkt: WKTElement) -> Optional[SaRow]:
187         t = self.conn.t.placex
188
189         sql = _select_from_placex(t, wkt)\
190                 .where(t.c.geometry.ST_DWithin(wkt, 0.001))\
191                 .where(t.c.parent_place_id == parent_place_id)\
192                 .where(_is_address_point(t))\
193                 .where(t.c.indexed_status == 0)\
194                 .where(t.c.linked_place_id == None)\
195                 .order_by('distance')\
196                 .limit(1)
197
198         sql = self._add_geometry_columns(sql, t.c.geometry)
199
200         return (await self.conn.execute(sql)).one_or_none()
201
202
203     async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
204                                              wkt: WKTElement,
205                                              distance: float) -> Optional[SaRow]:
206         t = self.conn.t.osmline
207
208         sql = sa.select(t,
209                         t.c.linegeo.ST_Distance(wkt).label('distance'),
210                         t.c.linegeo.ST_LineLocatePoint(wkt).label('position'))\
211                 .where(t.c.linegeo.ST_DWithin(wkt, distance))\
212                 .order_by('distance')\
213                 .limit(1)
214
215         if parent_place_id is not None:
216             sql = sql.where(t.c.parent_place_id == parent_place_id)
217
218         inner = sql.subquery()
219
220         sql = sa.select(inner.c.place_id, inner.c.osm_id,
221                         inner.c.parent_place_id, inner.c.address,
222                         _interpolated_housenumber(inner),
223                         _interpolated_position(inner),
224                         inner.c.postcode, inner.c.country_code,
225                         inner.c.distance)
226
227         if self.details.geometry_output:
228             sub = sql.subquery()
229             sql = self._add_geometry_columns(sql, sub.c.centroid)
230
231         return (await self.conn.execute(sql)).one_or_none()
232
233
234     async def _find_tiger_number_for_street(self, parent_place_id: int,
235                                             parent_type: str, parent_id: int,
236                                             wkt: WKTElement) -> Optional[SaRow]:
237         t = self.conn.t.tiger
238
239         inner = sa.select(t,
240                           t.c.linegeo.ST_Distance(wkt).label('distance'),
241                           sa.func.ST_LineLocatePoint(t.c.linegeo, wkt).label('position'))\
242                   .where(t.c.linegeo.ST_DWithin(wkt, 0.001))\
243                   .where(t.c.parent_place_id == parent_place_id)\
244                   .order_by('distance')\
245                   .limit(1)\
246                   .subquery()
247
248         sql = sa.select(inner.c.place_id,
249                         inner.c.parent_place_id,
250                         sa.literal(parent_type).label('osm_type'),
251                         sa.literal(parent_id).label('osm_id'),
252                         _interpolated_housenumber(inner),
253                         _interpolated_position(inner),
254                         inner.c.postcode,
255                         inner.c.distance)
256
257         if self.details.geometry_output:
258             sub = sql.subquery()
259             sql = self._add_geometry_columns(sql, sub.c.centroid)
260
261         return (await self.conn.execute(sql)).one_or_none()
262
263
264     async def lookup_street_poi(self,
265                                 wkt: WKTElement) -> Tuple[Optional[SaRow], RowFunc]:
266         """ Find a street or POI/address for the given WKT point.
267         """
268         log().section('Reverse lookup on street/address level')
269         distance = 0.006
270         parent_place_id = None
271
272         row = await self._find_closest_street_or_poi(wkt, distance)
273         row_func: RowFunc = nres.create_from_placex_row
274         log().var_dump('Result (street/building)', row)
275
276         # If the closest result was a street, but an address was requested,
277         # check for a housenumber nearby which is part of the street.
278         if row is not None:
279             if self.max_rank > 27 \
280                and self.layer_enabled(DataLayer.ADDRESS) \
281                and row.rank_address <= 27:
282                 distance = 0.001
283                 parent_place_id = row.place_id
284                 log().comment('Find housenumber for street')
285                 addr_row = await self._find_housenumber_for_street(parent_place_id, wkt)
286                 log().var_dump('Result (street housenumber)', addr_row)
287
288                 if addr_row is not None:
289                     row = addr_row
290                     row_func = nres.create_from_placex_row
291                     distance = addr_row.distance
292                 elif row.country_code == 'us' and parent_place_id is not None:
293                     log().comment('Find TIGER housenumber for street')
294                     addr_row = await self._find_tiger_number_for_street(parent_place_id,
295                                                                         row.osm_type,
296                                                                         row.osm_id,
297                                                                         wkt)
298                     log().var_dump('Result (street Tiger housenumber)', addr_row)
299
300                     if addr_row is not None:
301                         row = addr_row
302                         row_func = nres.create_from_tiger_row
303             else:
304                 distance = row.distance
305
306         # Check for an interpolation that is either closer than our result
307         # or belongs to a close street found.
308         if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
309             log().comment('Find interpolation for street')
310             addr_row = await self._find_interpolation_for_street(parent_place_id,
311                                                                  wkt, distance)
312             log().var_dump('Result (street interpolation)', addr_row)
313             if addr_row is not None:
314                 row = addr_row
315                 row_func = nres.create_from_osmline_row
316
317         return row, row_func
318
319
320     async def _lookup_area_address(self, wkt: WKTElement) -> Optional[SaRow]:
321         """ Lookup large addressable areas for the given WKT point.
322         """
323         log().comment('Reverse lookup by larger address area features')
324         t = self.conn.t.placex
325
326         # The inner SQL brings results in the right order, so that
327         # later only a minimum of results needs to be checked with ST_Contains.
328         inner = sa.select(t, sa.literal(0.0).label('distance'))\
329                   .where(t.c.rank_search.between(5, self.max_rank))\
330                   .where(t.c.rank_address.between(5, 25))\
331                   .where(t.c.geometry.ST_GeometryType().in_(('ST_Polygon', 'ST_MultiPolygon')))\
332                   .where(t.c.geometry.intersects(wkt))\
333                   .where(t.c.name != None)\
334                   .where(t.c.indexed_status == 0)\
335                   .where(t.c.linked_place_id == None)\
336                   .where(t.c.type != 'postcode')\
337                   .order_by(sa.desc(t.c.rank_search))\
338                   .limit(50)\
339                   .subquery()
340
341         sql = _select_from_placex(inner)\
342                   .where(inner.c.geometry.ST_Contains(wkt))\
343                   .order_by(sa.desc(inner.c.rank_search))\
344                   .limit(1)
345
346         sql = self._add_geometry_columns(sql, inner.c.geometry)
347
348         address_row = (await self.conn.execute(sql)).one_or_none()
349         log().var_dump('Result (area)', address_row)
350
351         if address_row is not None and address_row.rank_search < self.max_rank:
352             log().comment('Search for better matching place nodes inside the area')
353             inner = sa.select(t,
354                               t.c.geometry.ST_Distance(wkt).label('distance'))\
355                       .where(t.c.osm_type == 'N')\
356                       .where(t.c.rank_search > address_row.rank_search)\
357                       .where(t.c.rank_search <= self.max_rank)\
358                       .where(t.c.rank_address.between(5, 25))\
359                       .where(t.c.name != None)\
360                       .where(t.c.indexed_status == 0)\
361                       .where(t.c.linked_place_id == None)\
362                       .where(t.c.type != 'postcode')\
363                       .where(t.c.geometry
364                                 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
365                                 .intersects(wkt))\
366                       .order_by(sa.desc(t.c.rank_search))\
367                       .limit(50)\
368                       .subquery()
369
370             touter = self.conn.t.placex.alias('outer')
371             sql = _select_from_placex(inner)\
372                   .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
373                   .where(touter.c.place_id == address_row.place_id)\
374                   .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
375                   .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
376                   .limit(1)
377
378             sql = self._add_geometry_columns(sql, inner.c.geometry)
379
380             place_address_row = (await self.conn.execute(sql)).one_or_none()
381             log().var_dump('Result (place node)', place_address_row)
382
383             if place_address_row is not None:
384                 return place_address_row
385
386         return address_row
387
388
389     async def _lookup_area_others(self, wkt: WKTElement) -> Optional[SaRow]:
390         t = self.conn.t.placex
391
392         inner = sa.select(t, t.c.geometry.ST_Distance(wkt).label('distance'))\
393                   .where(t.c.rank_address == 0)\
394                   .where(t.c.rank_search.between(5, self.max_rank))\
395                   .where(t.c.name != None)\
396                   .where(t.c.indexed_status == 0)\
397                   .where(t.c.linked_place_id == None)\
398                   .where(self._filter_by_layer(t))\
399                   .where(t.c.geometry
400                                 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
401                                 .intersects(wkt))\
402                   .order_by(sa.desc(t.c.rank_search))\
403                   .limit(50)\
404                   .subquery()
405
406         sql = _select_from_placex(inner)\
407                   .where(sa.or_(inner.c.geometry.ST_GeometryType()
408                                                 .not_in(('ST_Polygon', 'ST_MultiPolygon')),
409                                 inner.c.geometry.ST_Contains(wkt)))\
410                   .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
411                   .limit(1)
412
413         sql = self._add_geometry_columns(sql, inner.c.geometry)
414
415         row = (await self.conn.execute(sql)).one_or_none()
416         log().var_dump('Result (non-address feature)', row)
417
418         return row
419
420
421     async def lookup_area(self, wkt: WKTElement) -> Optional[SaRow]:
422         """ Lookup large areas for the given WKT point.
423         """
424         log().section('Reverse lookup by larger area features')
425
426         if self.layer_enabled(DataLayer.ADDRESS):
427             address_row = await self._lookup_area_address(wkt)
428         else:
429             address_row = None
430
431         if self.has_feature_layers():
432             other_row = await self._lookup_area_others(wkt)
433         else:
434             other_row = None
435
436         return _get_closest(address_row, other_row)
437
438
439     async def lookup_country(self, wkt: WKTElement) -> Optional[SaRow]:
440         """ Lookup the country for the given WKT point.
441         """
442         log().section('Reverse lookup by country code')
443         t = self.conn.t.country_grid
444         sql = sa.select(t.c.country_code).distinct()\
445                 .where(t.c.geometry.ST_Contains(wkt))
446
447         ccodes = tuple((r[0] for r in await self.conn.execute(sql)))
448         log().var_dump('Country codes', ccodes)
449
450         if not ccodes:
451             return None
452
453         t = self.conn.t.placex
454         if self.max_rank > 4:
455             log().comment('Search for place nodes in country')
456
457             inner = sa.select(t,
458                               t.c.geometry.ST_Distance(wkt).label('distance'))\
459                       .where(t.c.osm_type == 'N')\
460                       .where(t.c.rank_search > 4)\
461                       .where(t.c.rank_search <= self.max_rank)\
462                       .where(t.c.rank_address.between(5, 25))\
463                       .where(t.c.name != None)\
464                       .where(t.c.indexed_status == 0)\
465                       .where(t.c.linked_place_id == None)\
466                       .where(t.c.type != 'postcode')\
467                       .where(t.c.country_code.in_(ccodes))\
468                       .where(t.c.geometry
469                                 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
470                                 .intersects(wkt))\
471                       .order_by(sa.desc(t.c.rank_search))\
472                       .limit(50)\
473                       .subquery()
474
475             sql = _select_from_placex(inner)\
476                   .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
477                   .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
478                   .limit(1)
479
480             sql = self._add_geometry_columns(sql, inner.c.geometry)
481
482             address_row = (await self.conn.execute(sql)).one_or_none()
483             log().var_dump('Result (addressable place node)', address_row)
484         else:
485             address_row = None
486
487         if address_row is None:
488             # Still nothing, then return a country with the appropriate country code.
489             sql = _select_from_placex(t, wkt)\
490                       .where(t.c.country_code.in_(ccodes))\
491                       .where(t.c.rank_address == 4)\
492                       .where(t.c.rank_search == 4)\
493                       .where(t.c.linked_place_id == None)\
494                       .order_by('distance')
495
496             sql = self._add_geometry_columns(sql, t.c.geometry)
497
498             address_row = (await self.conn.execute(sql)).one_or_none()
499
500         return address_row
501
502
503     async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
504         """ Look up a single coordinate. Returns the place information,
505             if a place was found near the coordinates or None otherwise.
506         """
507         log().function('reverse_lookup',
508                        coord=coord, max_rank=self.max_rank,
509                        layer=self.layer, details=self.details)
510
511
512         wkt = WKTElement(f'POINT({coord[0]} {coord[1]})', srid=4326)
513
514         row: Optional[SaRow] = None
515         row_func: RowFunc = nres.create_from_placex_row
516
517         if self.max_rank >= 26:
518             row, tmp_row_func = await self.lookup_street_poi(wkt)
519             if row is not None:
520                 row_func = tmp_row_func
521         if row is None and self.max_rank > 4:
522             row = await self.lookup_area(wkt)
523         if row is None and self.layer_enabled(DataLayer.ADDRESS):
524             row = await self.lookup_country(wkt)
525
526         result = row_func(row, nres.ReverseResult)
527         if result is not None:
528             assert row is not None
529             result.distance = row.distance
530             if hasattr(row, 'bbox'):
531                 result.bbox = Bbox.from_wkb(row.bbox.data)
532             await nres.add_result_details(self.conn, result, self.details)
533
534         return result