1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of place lookup by ID.
10 from typing import Optional, Callable, Tuple, Type
13 import sqlalchemy as sa
15 from nominatim.typing import SaColumn, SaRow, SaSelect
16 from nominatim.api.connection import SearchConnection
17 import nominatim.api.types as ntyp
18 import nominatim.api.results as nres
19 from nominatim.api.logging import log
21 RowFunc = Callable[[Optional[SaRow], Type[nres.BaseResultT]], Optional[nres.BaseResultT]]
23 GeomFunc = Callable[[SaSelect, SaColumn], SaSelect]
27 async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef,
28 add_geometries: GeomFunc) -> Optional[SaRow]:
29 """ Search for the given place in the placex table and return the
32 log().section("Find in placex table")
34 sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
35 t.c.class_, t.c.type, t.c.admin_level,
36 t.c.address, t.c.extratags,
37 t.c.housenumber, t.c.postcode, t.c.country_code,
38 t.c.importance, t.c.wikipedia, t.c.indexed_date,
39 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
41 t.c.geometry.ST_Expand(0).label('bbox'),
44 if isinstance(place, ntyp.PlaceID):
45 sql = sql.where(t.c.place_id == place.place_id)
46 elif isinstance(place, ntyp.OsmID):
47 sql = sql.where(t.c.osm_type == place.osm_type)\
48 .where(t.c.osm_id == place.osm_id)
50 sql = sql.where(t.c.class_ == place.osm_class)
52 sql = sql.order_by(t.c.class_)
57 return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
60 async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef,
61 add_geometries: GeomFunc) -> Optional[SaRow]:
62 """ Search for the given place in the osmline table and return the
65 log().section("Find in interpolation table")
67 sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
68 t.c.indexed_date, t.c.startnumber, t.c.endnumber,
69 t.c.step, t.c.address, t.c.postcode, t.c.country_code,
70 t.c.linegeo.ST_Centroid().label('centroid'))
72 if isinstance(place, ntyp.PlaceID):
73 sql = sql.where(t.c.place_id == place.place_id)
74 elif isinstance(place, ntyp.OsmID) and place.osm_type == 'W':
75 # There may be multiple interpolations for a single way.
76 # If 'class' contains a number, return the one that belongs to that number.
77 sql = sql.where(t.c.osm_id == place.osm_id).limit(1)
78 if place.osm_class and place.osm_class.isdigit():
79 sql = sql.order_by(sa.func.greatest(0,
80 sa.func.least(int(place.osm_class) - t.c.endnumber),
81 t.c.startnumber - int(place.osm_class)))
85 return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
88 async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef,
89 add_geometries: GeomFunc) -> Optional[SaRow]:
90 """ Search for the given place in the table of Tiger addresses and return
91 the base information. Only lookup by place ID is supported.
93 if not isinstance(place, ntyp.PlaceID):
96 log().section("Find in TIGER table")
98 parent = conn.t.placex
99 sql = sa.select(t.c.place_id, t.c.parent_place_id,
100 parent.c.osm_type, parent.c.osm_id,
101 t.c.startnumber, t.c.endnumber, t.c.step,
103 t.c.linegeo.ST_Centroid().label('centroid'))\
104 .where(t.c.place_id == place.place_id)\
105 .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)
107 return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
110 async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef,
111 add_geometries: GeomFunc) -> Optional[SaRow]:
112 """ Search for the given place in the postcode table and return the
113 base information. Only lookup by place ID is supported.
115 if not isinstance(place, ntyp.PlaceID):
118 log().section("Find in postcode table")
120 sql = sa.select(t.c.place_id, t.c.parent_place_id,
121 t.c.rank_search, t.c.rank_address,
122 t.c.indexed_date, t.c.postcode, t.c.country_code,
123 t.c.geometry.label('centroid')) \
124 .where(t.c.place_id == place.place_id)
126 return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
129 async def find_in_all_tables(conn: SearchConnection, place: ntyp.PlaceRef,
130 add_geometries: GeomFunc
131 ) -> Tuple[Optional[SaRow], RowFunc[nres.BaseResultT]]:
132 """ Search for the given place in all data tables
133 and return the base information.
135 row = await find_in_placex(conn, place, add_geometries)
136 log().var_dump('Result (placex)', row)
138 return row, nres.create_from_placex_row
140 row = await find_in_osmline(conn, place, add_geometries)
141 log().var_dump('Result (osmline)', row)
143 return row, nres.create_from_osmline_row
145 row = await find_in_postcode(conn, place, add_geometries)
146 log().var_dump('Result (postcode)', row)
148 return row, nres.create_from_postcode_row
150 row = await find_in_tiger(conn, place, add_geometries)
151 log().var_dump('Result (tiger)', row)
152 return row, nres.create_from_tiger_row
155 async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef,
156 details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
157 """ Retrieve a place with additional details from the database.
159 log().function('get_detailed_place', place=place, details=details)
161 if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
162 raise ValueError("lookup only supports geojosn polygon output.")
164 if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
165 def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
166 return sql.add_columns(sa.literal_column(f"""
167 ST_AsGeoJSON(CASE WHEN ST_NPoints({column.name}) > 5000
168 THEN ST_SimplifyPreserveTopology({column.name}, 0.0001)
169 ELSE {column.name} END)
170 """).label('geometry_geojson'))
172 def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
173 return sql.add_columns(sa.func.ST_GeometryType(column).label('geometry_type'))
175 row_func: RowFunc[nres.DetailedResult]
176 row, row_func = await find_in_all_tables(conn, place, _add_geometry)
181 result = row_func(row, nres.DetailedResult)
182 assert result is not None
184 # add missing details
185 assert result is not None
186 result.parent_place_id = row.parent_place_id
187 result.linked_place_id = getattr(row, 'linked_place_id', None)
188 result.admin_level = getattr(row, 'admin_level', 15)
189 indexed_date = getattr(row, 'indexed_date', None)
190 if indexed_date is not None:
191 result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
193 await nres.add_result_details(conn, [result], details)
198 async def get_simple_place(conn: SearchConnection, place: ntyp.PlaceRef,
199 details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
200 """ Retrieve a place as a simple search result from the database.
202 log().function('get_simple_place', place=place, details=details)
204 def _add_geometry(sql: SaSelect, col: SaColumn) -> SaSelect:
205 if not details.geometry_output:
210 if details.geometry_simplification > 0.0:
211 col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification)
213 if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
214 out.append(sa.func.ST_AsGeoJSON(col).label('geometry_geojson'))
215 if details.geometry_output & ntyp.GeometryFormat.TEXT:
216 out.append(sa.func.ST_AsText(col).label('geometry_text'))
217 if details.geometry_output & ntyp.GeometryFormat.KML:
218 out.append(sa.func.ST_AsKML(col).label('geometry_kml'))
219 if details.geometry_output & ntyp.GeometryFormat.SVG:
220 out.append(sa.func.ST_AsSVG(col).label('geometry_svg'))
222 return sql.add_columns(*out)
225 row_func: RowFunc[nres.SearchResult]
226 row, row_func = await find_in_all_tables(conn, place, _add_geometry)
231 result = row_func(row, nres.SearchResult)
232 assert result is not None
234 # add missing details
235 assert result is not None
236 if hasattr(row, 'bbox'):
237 result.bbox = ntyp.Bbox.from_wkb(row.bbox)
239 await nres.add_result_details(conn, [result], details)