]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/lookup.py
python lookup: factor out finding in tables into own function
[nominatim.git] / nominatim / api / lookup.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of place lookup by ID.
9 """
10 from typing import Optional, Callable, Tuple, Type
11 import datetime as dt
12
13 import sqlalchemy as sa
14
15 from nominatim.typing import SaColumn, SaLabel, SaRow
16 from nominatim.api.connection import SearchConnection
17 import nominatim.api.types as ntyp
18 import nominatim.api.results as nres
19 from nominatim.api.logging import log
20
21 RowFunc = Callable[[Optional[SaRow], Type[nres.BaseResultT]], Optional[nres.BaseResultT]]
22
23 def _select_column_geometry(column: SaColumn,
24                             geometry_output: ntyp.GeometryFormat) -> SaLabel:
25     """ Create the appropriate column expression for selecting a
26         geometry for the details response.
27     """
28     if geometry_output & ntyp.GeometryFormat.GEOJSON:
29         return sa.literal_column(f"""
30                   ST_AsGeoJSON(CASE WHEN ST_NPoints({column.name}) > 5000
31                                THEN ST_SimplifyPreserveTopology({column.name}, 0.0001)
32                                ELSE {column.name} END)
33                   """).label('geometry_geojson')
34
35     return sa.func.ST_GeometryType(column).label('geometry_type')
36
37
38 async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef,
39                          details: ntyp.LookupDetails) -> Optional[SaRow]:
40     """ Search for the given place in the placex table and return the
41         base information.
42     """
43     log().section("Find in placex table")
44     t = conn.t.placex
45     sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
46                     t.c.class_, t.c.type, t.c.admin_level,
47                     t.c.address, t.c.extratags,
48                     t.c.housenumber, t.c.postcode, t.c.country_code,
49                     t.c.importance, t.c.wikipedia, t.c.indexed_date,
50                     t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
51                     t.c.linked_place_id,
52                     t.c.centroid,
53                     _select_column_geometry(t.c.geometry, details.geometry_output))
54
55     if isinstance(place, ntyp.PlaceID):
56         sql = sql.where(t.c.place_id == place.place_id)
57     elif isinstance(place, ntyp.OsmID):
58         sql = sql.where(t.c.osm_type == place.osm_type)\
59                  .where(t.c.osm_id == place.osm_id)
60         if place.osm_class:
61             sql = sql.where(t.c.class_ == place.osm_class)
62         else:
63             sql = sql.order_by(t.c.class_)
64         sql = sql.limit(1)
65     else:
66         return None
67
68     return (await conn.execute(sql)).one_or_none()
69
70
71 async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef,
72                           details: ntyp.LookupDetails) -> Optional[SaRow]:
73     """ Search for the given place in the osmline table and return the
74         base information.
75     """
76     log().section("Find in interpolation table")
77     t = conn.t.osmline
78     sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
79                     t.c.indexed_date, t.c.startnumber, t.c.endnumber,
80                     t.c.step, t.c.address, t.c.postcode, t.c.country_code,
81                     t.c.linegeo.ST_Centroid().label('centroid'),
82                     _select_column_geometry(t.c.linegeo, details.geometry_output))
83
84     if isinstance(place, ntyp.PlaceID):
85         sql = sql.where(t.c.place_id == place.place_id)
86     elif isinstance(place, ntyp.OsmID) and place.osm_type == 'W':
87         # There may be multiple interpolations for a single way.
88         # If 'class' contains a number, return the one that belongs to that number.
89         sql = sql.where(t.c.osm_id == place.osm_id).limit(1)
90         if place.osm_class and place.osm_class.isdigit():
91             sql = sql.order_by(sa.func.greatest(0,
92                                     sa.func.least(int(place.osm_class) - t.c.endnumber),
93                                            t.c.startnumber - int(place.osm_class)))
94     else:
95         return None
96
97     return (await conn.execute(sql)).one_or_none()
98
99
100 async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef,
101                         details: ntyp.LookupDetails) -> Optional[SaRow]:
102     """ Search for the given place in the table of Tiger addresses and return
103         the base information. Only lookup by place ID is supported.
104     """
105     log().section("Find in TIGER table")
106     t = conn.t.tiger
107     parent = conn.t.placex
108     sql = sa.select(t.c.place_id, t.c.parent_place_id,
109                     parent.c.osm_type, parent.c.osm_id,
110                     t.c.startnumber, t.c.endnumber, t.c.step,
111                     t.c.postcode,
112                     t.c.linegeo.ST_Centroid().label('centroid'),
113                     _select_column_geometry(t.c.linegeo, details.geometry_output))
114
115     if isinstance(place, ntyp.PlaceID):
116         sql = sql.where(t.c.place_id == place.place_id)\
117                  .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)
118     else:
119         return None
120
121     return (await conn.execute(sql)).one_or_none()
122
123
124 async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef,
125                            details: ntyp.LookupDetails) -> Optional[SaRow]:
126     """ Search for the given place in the postcode table and return the
127         base information. Only lookup by place ID is supported.
128     """
129     log().section("Find in postcode table")
130     t = conn.t.postcode
131     sql = sa.select(t.c.place_id, t.c.parent_place_id,
132                     t.c.rank_search, t.c.rank_address,
133                     t.c.indexed_date, t.c.postcode, t.c.country_code,
134                     t.c.geometry.label('centroid'),
135                     _select_column_geometry(t.c.geometry, details.geometry_output))
136
137     if isinstance(place, ntyp.PlaceID):
138         sql = sql.where(t.c.place_id == place.place_id)
139     else:
140         return None
141
142     return (await conn.execute(sql)).one_or_none()
143
144
145 async def find_in_all_tables(conn: SearchConnection, place: ntyp.PlaceRef,
146                              details: ntyp.LookupDetails
147                             ) -> Tuple[Optional[SaRow], RowFunc[nres.BaseResultT]]:
148     """ Search for the given place in all data tables
149         and return the base information.
150     """
151     row = await find_in_placex(conn, place, details)
152     log().var_dump('Result (placex)', row)
153     if row is not None:
154         return row, nres.create_from_placex_row
155
156     row = await find_in_osmline(conn, place, details)
157     log().var_dump('Result (osmline)', row)
158     if row is not None:
159         return row, nres.create_from_osmline_row
160
161     row = await find_in_postcode(conn, place, details)
162     log().var_dump('Result (postcode)', row)
163     if row is not None:
164         return row, nres.create_from_postcode_row
165
166     row = await find_in_tiger(conn, place, details)
167     log().var_dump('Result (tiger)', row)
168     return row, nres.create_from_tiger_row
169
170
171 async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef,
172                              details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
173     """ Retrieve a place with additional details from the database.
174     """
175     log().function('get_place_by_id', place=place, details=details)
176
177     if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
178         raise ValueError("lookup only supports geojosn polygon output.")
179
180     row_func: RowFunc[nres.DetailedResult]
181     row, row_func = await find_in_all_tables(conn, place, details)
182
183     if row is None:
184         return None
185
186     result = row_func(row, nres.DetailedResult)
187     assert result is not None
188
189     # add missing details
190     assert result is not None
191     result.parent_place_id = row.parent_place_id
192     result.linked_place_id = getattr(row, 'linked_place_id', None)
193     result.admin_level = getattr(row, 'admin_level', 15)
194     indexed_date = getattr(row, 'indexed_date', None)
195     if indexed_date is not None:
196         result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
197
198     await nres.add_result_details(conn, result, details)
199
200     return result