1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of place lookup by ID.
10 from typing import Optional, Callable, Tuple, Type
13 import sqlalchemy as sa
15 from nominatim.typing import SaColumn, SaRow, SaSelect
16 from nominatim.api.connection import SearchConnection
17 import nominatim.api.types as ntyp
18 import nominatim.api.results as nres
19 from nominatim.api.logging import log
21 RowFunc = Callable[[Optional[SaRow], Type[nres.BaseResultT]], Optional[nres.BaseResultT]]
23 GeomFunc = Callable[[SaSelect, SaColumn], SaSelect]
27 async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef,
28 add_geometries: GeomFunc) -> Optional[SaRow]:
29 """ Search for the given place in the placex table and return the
32 log().section("Find in placex table")
34 sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
35 t.c.class_, t.c.type, t.c.admin_level,
36 t.c.address, t.c.extratags,
37 t.c.housenumber, t.c.postcode, t.c.country_code,
38 t.c.importance, t.c.wikipedia, t.c.indexed_date,
39 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
41 t.c.geometry.ST_Expand(0).label('bbox'),
44 if isinstance(place, ntyp.PlaceID):
45 sql = sql.where(t.c.place_id == place.place_id)
46 elif isinstance(place, ntyp.OsmID):
47 sql = sql.where(t.c.osm_type == place.osm_type)\
48 .where(t.c.osm_id == place.osm_id)
50 sql = sql.where(t.c.class_ == place.osm_class)
52 sql = sql.order_by(t.c.class_)
57 return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
60 async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef,
61 add_geometries: GeomFunc) -> Optional[SaRow]:
62 """ Search for the given place in the osmline table and return the
65 log().section("Find in interpolation table")
67 sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
68 t.c.indexed_date, t.c.startnumber, t.c.endnumber,
69 t.c.step, t.c.address, t.c.postcode, t.c.country_code,
70 t.c.linegeo.ST_Centroid().label('centroid'))
72 if isinstance(place, ntyp.PlaceID):
73 sql = sql.where(t.c.place_id == place.place_id)
74 elif isinstance(place, ntyp.OsmID) and place.osm_type == 'W':
75 # There may be multiple interpolations for a single way.
76 # If 'class' contains a number, return the one that belongs to that number.
77 sql = sql.where(t.c.osm_id == place.osm_id).limit(1)
78 if place.osm_class and place.osm_class.isdigit():
79 sql = sql.order_by(sa.func.greatest(0,
80 sa.func.least(int(place.osm_class) - t.c.endnumber),
81 t.c.startnumber - int(place.osm_class)))
85 return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
88 async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef,
89 add_geometries: GeomFunc) -> Optional[SaRow]:
90 """ Search for the given place in the table of Tiger addresses and return
91 the base information. Only lookup by place ID is supported.
93 if not isinstance(place, ntyp.PlaceID):
96 log().section("Find in TIGER table")
98 parent = conn.t.placex
99 sql = sa.select(t.c.place_id, t.c.parent_place_id,
100 parent.c.osm_type, parent.c.osm_id,
101 t.c.startnumber, t.c.endnumber, t.c.step,
103 t.c.linegeo.ST_Centroid().label('centroid'))\
104 .where(t.c.place_id == place.place_id)\
105 .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)
107 return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
110 async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef,
111 add_geometries: GeomFunc) -> Optional[SaRow]:
112 """ Search for the given place in the postcode table and return the
113 base information. Only lookup by place ID is supported.
115 if not isinstance(place, ntyp.PlaceID):
118 log().section("Find in postcode table")
120 sql = sa.select(t.c.place_id, t.c.parent_place_id,
121 t.c.rank_search, t.c.rank_address,
122 t.c.indexed_date, t.c.postcode, t.c.country_code,
123 t.c.geometry.label('centroid')) \
124 .where(t.c.place_id == place.place_id)
126 return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
129 async def find_in_all_tables(conn: SearchConnection, place: ntyp.PlaceRef,
130 add_geometries: GeomFunc
131 ) -> Tuple[Optional[SaRow], RowFunc[nres.BaseResultT]]:
132 """ Search for the given place in all data tables
133 and return the base information.
135 row = await find_in_placex(conn, place, add_geometries)
136 log().var_dump('Result (placex)', row)
138 return row, nres.create_from_placex_row
140 row = await find_in_osmline(conn, place, add_geometries)
141 log().var_dump('Result (osmline)', row)
143 return row, nres.create_from_osmline_row
145 row = await find_in_postcode(conn, place, add_geometries)
146 log().var_dump('Result (postcode)', row)
148 return row, nres.create_from_postcode_row
150 row = await find_in_tiger(conn, place, add_geometries)
151 log().var_dump('Result (tiger)', row)
152 return row, nres.create_from_tiger_row
155 async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef,
156 details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
157 """ Retrieve a place with additional details from the database.
159 log().function('get_detailed_place', place=place, details=details)
161 if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
162 raise ValueError("lookup only supports geojosn polygon output.")
164 if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
165 def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
166 return sql.add_columns(sa.literal_column(f"""
167 ST_AsGeoJSON(CASE WHEN ST_NPoints({column.name}) > 5000
168 THEN ST_SimplifyPreserveTopology({column.name}, 0.0001)
169 ELSE {column.name} END)
170 """).label('geometry_geojson'))
172 def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
173 return sql.add_columns(sa.func.ST_GeometryType(column).label('geometry_type'))
175 row_func: RowFunc[nres.DetailedResult]
176 row, row_func = await find_in_all_tables(conn, place, _add_geometry)
181 result = row_func(row, nres.DetailedResult)
182 assert result is not None
184 # add missing details
185 assert result is not None
186 indexed_date = getattr(row, 'indexed_date', None)
187 if indexed_date is not None:
188 result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
190 await nres.add_result_details(conn, [result], details)
195 async def get_simple_place(conn: SearchConnection, place: ntyp.PlaceRef,
196 details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
197 """ Retrieve a place as a simple search result from the database.
199 log().function('get_simple_place', place=place, details=details)
201 def _add_geometry(sql: SaSelect, col: SaColumn) -> SaSelect:
202 if not details.geometry_output:
207 if details.geometry_simplification > 0.0:
208 col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification)
210 if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
211 out.append(sa.func.ST_AsGeoJSON(col).label('geometry_geojson'))
212 if details.geometry_output & ntyp.GeometryFormat.TEXT:
213 out.append(sa.func.ST_AsText(col).label('geometry_text'))
214 if details.geometry_output & ntyp.GeometryFormat.KML:
215 out.append(sa.func.ST_AsKML(col).label('geometry_kml'))
216 if details.geometry_output & ntyp.GeometryFormat.SVG:
217 out.append(sa.func.ST_AsSVG(col).label('geometry_svg'))
219 return sql.add_columns(*out)
222 row_func: RowFunc[nres.SearchResult]
223 row, row_func = await find_in_all_tables(conn, place, _add_geometry)
228 result = row_func(row, nres.SearchResult)
229 assert result is not None
231 # add missing details
232 assert result is not None
233 if hasattr(row, 'bbox'):
234 result.bbox = ntyp.Bbox.from_wkb(row.bbox)
236 await nres.add_result_details(conn, [result], details)