]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/lookup.py
disable prepared statements
[nominatim.git] / nominatim / api / lookup.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of place lookup by ID.
9 """
10 from typing import Optional
11 import datetime as dt
12
13 import sqlalchemy as sa
14
15 from nominatim.typing import SaColumn, SaLabel, SaRow
16 from nominatim.api.connection import SearchConnection
17 import nominatim.api.types as ntyp
18 import nominatim.api.results as nres
19 from nominatim.api.logging import log
20
21 def _select_column_geometry(column: SaColumn,
22                             geometry_output: ntyp.GeometryFormat) -> SaLabel:
23     """ Create the appropriate column expression for selecting a
24         geometry for the details response.
25     """
26     if geometry_output & ntyp.GeometryFormat.GEOJSON:
27         return sa.literal_column(f"""
28                   ST_AsGeoJSON(CASE WHEN ST_NPoints({column.name}) > 5000
29                                THEN ST_SimplifyPreserveTopology({column.name}, 0.0001)
30                                ELSE {column.name} END)
31                   """).label('geometry_geojson')
32
33     return sa.func.ST_GeometryType(column).label('geometry_type')
34
35
36 async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef,
37                          details: ntyp.LookupDetails) -> Optional[SaRow]:
38     """ Search for the given place in the placex table and return the
39         base information.
40     """
41     log().section("Find in placex table")
42     t = conn.t.placex
43     sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
44                     t.c.class_, t.c.type, t.c.admin_level,
45                     t.c.address, t.c.extratags,
46                     t.c.housenumber, t.c.postcode, t.c.country_code,
47                     t.c.importance, t.c.wikipedia, t.c.indexed_date,
48                     t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
49                     t.c.linked_place_id,
50                     t.c.centroid,
51                     _select_column_geometry(t.c.geometry, details.geometry_output))
52
53     if isinstance(place, ntyp.PlaceID):
54         sql = sql.where(t.c.place_id == place.place_id)
55     elif isinstance(place, ntyp.OsmID):
56         sql = sql.where(t.c.osm_type == place.osm_type)\
57                  .where(t.c.osm_id == place.osm_id)
58         if place.osm_class:
59             sql = sql.where(t.c.class_ == place.osm_class)
60         else:
61             sql = sql.order_by(t.c.class_)
62         sql = sql.limit(1)
63     else:
64         return None
65
66     return (await conn.execute(sql)).one_or_none()
67
68
69 async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef,
70                           details: ntyp.LookupDetails) -> Optional[SaRow]:
71     """ Search for the given place in the osmline table and return the
72         base information.
73     """
74     log().section("Find in interpolation table")
75     t = conn.t.osmline
76     sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
77                     t.c.indexed_date, t.c.startnumber, t.c.endnumber,
78                     t.c.step, t.c.address, t.c.postcode, t.c.country_code,
79                     t.c.linegeo.ST_Centroid().label('centroid'),
80                     _select_column_geometry(t.c.linegeo, details.geometry_output))
81
82     if isinstance(place, ntyp.PlaceID):
83         sql = sql.where(t.c.place_id == place.place_id)
84     elif isinstance(place, ntyp.OsmID) and place.osm_type == 'W':
85         # There may be multiple interpolations for a single way.
86         # If 'class' contains a number, return the one that belongs to that number.
87         sql = sql.where(t.c.osm_id == place.osm_id).limit(1)
88         if place.osm_class and place.osm_class.isdigit():
89             sql = sql.order_by(sa.func.greatest(0,
90                                     sa.func.least(int(place.osm_class) - t.c.endnumber),
91                                            t.c.startnumber - int(place.osm_class)))
92     else:
93         return None
94
95     return (await conn.execute(sql)).one_or_none()
96
97
98 async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef,
99                         details: ntyp.LookupDetails) -> Optional[SaRow]:
100     """ Search for the given place in the table of Tiger addresses and return
101         the base information. Only lookup by place ID is supported.
102     """
103     log().section("Find in TIGER table")
104     t = conn.t.tiger
105     parent = conn.t.placex
106     sql = sa.select(t.c.place_id, t.c.parent_place_id,
107                     parent.c.osm_type, parent.c.osm_id,
108                     t.c.startnumber, t.c.endnumber, t.c.step,
109                     t.c.postcode,
110                     t.c.linegeo.ST_Centroid().label('centroid'),
111                     _select_column_geometry(t.c.linegeo, details.geometry_output))
112
113     if isinstance(place, ntyp.PlaceID):
114         sql = sql.where(t.c.place_id == place.place_id)\
115                  .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)
116     else:
117         return None
118
119     return (await conn.execute(sql)).one_or_none()
120
121
122 async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef,
123                            details: ntyp.LookupDetails) -> Optional[SaRow]:
124     """ Search for the given place in the postcode table and return the
125         base information. Only lookup by place ID is supported.
126     """
127     log().section("Find in postcode table")
128     t = conn.t.postcode
129     sql = sa.select(t.c.place_id, t.c.parent_place_id,
130                     t.c.rank_search, t.c.rank_address,
131                     t.c.indexed_date, t.c.postcode, t.c.country_code,
132                     t.c.geometry.label('centroid'),
133                     _select_column_geometry(t.c.geometry, details.geometry_output))
134
135     if isinstance(place, ntyp.PlaceID):
136         sql = sql.where(t.c.place_id == place.place_id)
137     else:
138         return None
139
140     return (await conn.execute(sql)).one_or_none()
141
142
143 async def get_place_by_id(conn: SearchConnection, place: ntyp.PlaceRef,
144                           details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
145     """ Retrieve a place with additional details from the database.
146     """
147     log().function('get_place_by_id', place=place, details=details)
148
149     if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
150         raise ValueError("lookup only supports geojosn polygon output.")
151
152     row = await find_in_placex(conn, place, details)
153     log().var_dump('Result (placex)', row)
154     if row is not None:
155         result = nres.create_from_placex_row(row, nres.DetailedResult)
156     else:
157         row = await find_in_osmline(conn, place, details)
158         log().var_dump('Result (osmline)', row)
159         if row is not None:
160             result = nres.create_from_osmline_row(row, nres.DetailedResult)
161         else:
162             row = await find_in_postcode(conn, place, details)
163             log().var_dump('Result (postcode)', row)
164             if row is not None:
165                 result = nres.create_from_postcode_row(row, nres.DetailedResult)
166             else:
167                 row = await find_in_tiger(conn, place, details)
168                 log().var_dump('Result (tiger)', row)
169                 if row is not None:
170                     result = nres.create_from_tiger_row(row, nres.DetailedResult)
171                 else:
172                     return None
173
174     # add missing details
175     assert result is not None
176     result.parent_place_id = row.parent_place_id
177     result.linked_place_id = getattr(row, 'linked_place_id', None)
178     result.admin_level = getattr(row, 'admin_level', 15)
179     indexed_date = getattr(row, 'indexed_date', None)
180     if indexed_date is not None:
181         result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
182
183     await nres.add_result_details(conn, result, details)
184
185     return result