1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of place lookup by ID.
10 from typing import Optional, Callable, Tuple, Type
13 import sqlalchemy as sa
15 from nominatim.typing import SaColumn, SaRow, SaSelect
16 from nominatim.api.connection import SearchConnection
17 import nominatim.api.types as ntyp
18 import nominatim.api.results as nres
19 from nominatim.api.logging import log
21 RowFunc = Callable[[Optional[SaRow], Type[nres.BaseResultT]], Optional[nres.BaseResultT]]
23 GeomFunc = Callable[[SaSelect, SaColumn], SaSelect]
27 async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef,
28 add_geometries: GeomFunc) -> Optional[SaRow]:
29 """ Search for the given place in the placex table and return the
32 log().section("Find in placex table")
34 sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
35 t.c.class_, t.c.type, t.c.admin_level,
36 t.c.address, t.c.extratags,
37 t.c.housenumber, t.c.postcode, t.c.country_code,
38 t.c.importance, t.c.wikipedia, t.c.indexed_date,
39 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
43 if isinstance(place, ntyp.PlaceID):
44 sql = sql.where(t.c.place_id == place.place_id)
45 elif isinstance(place, ntyp.OsmID):
46 sql = sql.where(t.c.osm_type == place.osm_type)\
47 .where(t.c.osm_id == place.osm_id)
49 sql = sql.where(t.c.class_ == place.osm_class)
51 sql = sql.order_by(t.c.class_)
56 return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
59 async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef,
60 add_geometries: GeomFunc) -> Optional[SaRow]:
61 """ Search for the given place in the osmline table and return the
64 log().section("Find in interpolation table")
66 sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
67 t.c.indexed_date, t.c.startnumber, t.c.endnumber,
68 t.c.step, t.c.address, t.c.postcode, t.c.country_code,
69 t.c.linegeo.ST_Centroid().label('centroid'))
71 if isinstance(place, ntyp.PlaceID):
72 sql = sql.where(t.c.place_id == place.place_id)
73 elif isinstance(place, ntyp.OsmID) and place.osm_type == 'W':
74 # There may be multiple interpolations for a single way.
75 # If 'class' contains a number, return the one that belongs to that number.
76 sql = sql.where(t.c.osm_id == place.osm_id).limit(1)
77 if place.osm_class and place.osm_class.isdigit():
78 sql = sql.order_by(sa.func.greatest(0,
79 sa.func.least(int(place.osm_class) - t.c.endnumber),
80 t.c.startnumber - int(place.osm_class)))
84 return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
87 async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef,
88 add_geometries: GeomFunc) -> Optional[SaRow]:
89 """ Search for the given place in the table of Tiger addresses and return
90 the base information. Only lookup by place ID is supported.
92 if not isinstance(place, ntyp.PlaceID):
95 log().section("Find in TIGER table")
97 parent = conn.t.placex
98 sql = sa.select(t.c.place_id, t.c.parent_place_id,
99 parent.c.osm_type, parent.c.osm_id,
100 t.c.startnumber, t.c.endnumber, t.c.step,
102 t.c.linegeo.ST_Centroid().label('centroid'))\
103 .where(t.c.place_id == place.place_id)\
104 .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)
106 return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
109 async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef,
110 add_geometries: GeomFunc) -> Optional[SaRow]:
111 """ Search for the given place in the postcode table and return the
112 base information. Only lookup by place ID is supported.
114 if not isinstance(place, ntyp.PlaceID):
117 log().section("Find in postcode table")
119 sql = sa.select(t.c.place_id, t.c.parent_place_id,
120 t.c.rank_search, t.c.rank_address,
121 t.c.indexed_date, t.c.postcode, t.c.country_code,
122 t.c.geometry.label('centroid')) \
123 .where(t.c.place_id == place.place_id)
125 return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
128 async def find_in_all_tables(conn: SearchConnection, place: ntyp.PlaceRef,
129 add_geometries: GeomFunc
130 ) -> Tuple[Optional[SaRow], RowFunc[nres.BaseResultT]]:
131 """ Search for the given place in all data tables
132 and return the base information.
134 row = await find_in_placex(conn, place, add_geometries)
135 log().var_dump('Result (placex)', row)
137 return row, nres.create_from_placex_row
139 row = await find_in_osmline(conn, place, add_geometries)
140 log().var_dump('Result (osmline)', row)
142 return row, nres.create_from_osmline_row
144 row = await find_in_postcode(conn, place, add_geometries)
145 log().var_dump('Result (postcode)', row)
147 return row, nres.create_from_postcode_row
149 row = await find_in_tiger(conn, place, add_geometries)
150 log().var_dump('Result (tiger)', row)
151 return row, nres.create_from_tiger_row
154 async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef,
155 details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
156 """ Retrieve a place with additional details from the database.
158 log().function('get_detailed_place', place=place, details=details)
160 if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
161 raise ValueError("lookup only supports geojosn polygon output.")
163 if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
164 def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
165 return sql.add_columns(sa.literal_column(f"""
166 ST_AsGeoJSON(CASE WHEN ST_NPoints({column.name}) > 5000
167 THEN ST_SimplifyPreserveTopology({column.name}, 0.0001)
168 ELSE {column.name} END)
169 """).label('geometry_geojson'))
171 def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
172 return sql.add_columns(sa.func.ST_GeometryType(column).label('geometry_type'))
174 row_func: RowFunc[nres.DetailedResult]
175 row, row_func = await find_in_all_tables(conn, place, _add_geometry)
180 result = row_func(row, nres.DetailedResult)
181 assert result is not None
183 # add missing details
184 assert result is not None
185 result.parent_place_id = row.parent_place_id
186 result.linked_place_id = getattr(row, 'linked_place_id', None)
187 result.admin_level = getattr(row, 'admin_level', 15)
188 indexed_date = getattr(row, 'indexed_date', None)
189 if indexed_date is not None:
190 result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
192 await nres.add_result_details(conn, [result], details)
197 async def get_simple_place(conn: SearchConnection, place: ntyp.PlaceRef,
198 details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
199 """ Retrieve a place as a simple search result from the database.
201 log().function('get_simple_place', place=place, details=details)
203 def _add_geometry(sql: SaSelect, col: SaColumn) -> SaSelect:
204 if not details.geometry_output:
209 if details.geometry_simplification > 0.0:
210 col = col.ST_SimplifyPreserveTopology(details.geometry_simplification)
212 if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
213 out.append(col.ST_AsGeoJSON().label('geometry_geojson'))
214 if details.geometry_output & ntyp.GeometryFormat.TEXT:
215 out.append(col.ST_AsText().label('geometry_text'))
216 if details.geometry_output & ntyp.GeometryFormat.KML:
217 out.append(col.ST_AsKML().label('geometry_kml'))
218 if details.geometry_output & ntyp.GeometryFormat.SVG:
219 out.append(col.ST_AsSVG().label('geometry_svg'))
221 return sql.add_columns(*out)
224 row_func: RowFunc[nres.SearchResult]
225 row, row_func = await find_in_all_tables(conn, place, _add_geometry)
230 result = row_func(row, nres.SearchResult)
231 assert result is not None
233 # add missing details
234 assert result is not None
235 result.bbox = getattr(row, 'bbox', None)
237 await nres.add_result_details(conn, [result], details)