]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/results.py
Adds sanitizer for preventing certain tags to enter search index based on parameters
[nominatim.git] / nominatim / api / results.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Dataclasses for search results and helper functions to fill them.
9
10 Data classes are part of the public API while the functions are for
11 internal use only. That's why they are implemented as free-standing functions
12 instead of member functions.
13 """
14 from typing import Optional, Tuple, Dict, Sequence
15 import enum
16 import dataclasses
17 import datetime as dt
18
19 import sqlalchemy as sa
20
21 from nominatim.typing import SaSelect, SaRow
22 from nominatim.api.types import Point, LookupDetails
23 from nominatim.api.connection import SearchConnection
24 from nominatim.api.logging import log
25
26 # This file defines complex result data classes.
27 # pylint: disable=too-many-instance-attributes
28
29 class SourceTable(enum.Enum):
30     """ Enumeration of kinds of results.
31     """
32     PLACEX = 1
33     OSMLINE = 2
34     TIGER = 3
35     POSTCODE = 4
36     COUNTRY = 5
37
38
39 @dataclasses.dataclass
40 class AddressLine:
41     """ Detailed information about a related place.
42     """
43     place_id: Optional[int]
44     osm_object: Optional[Tuple[str, int]]
45     category: Tuple[str, str]
46     names: Dict[str, str]
47     extratags: Optional[Dict[str, str]]
48
49     admin_level: Optional[int]
50     fromarea: bool
51     isaddress: bool
52     rank_address: int
53     distance: float
54
55
56 AddressLines = Sequence[AddressLine]
57
58
59 @dataclasses.dataclass
60 class WordInfo:
61     """ Detailed information about a search term.
62     """
63     word_id: int
64     word_token: str
65     word: Optional[str] = None
66
67
68 WordInfos = Sequence[WordInfo]
69
70
71 @dataclasses.dataclass
72 class SearchResult:
73     """ Data class collecting all available information about a search result.
74     """
75     source_table: SourceTable
76     category: Tuple[str, str]
77     centroid: Point
78
79     place_id : Optional[int] = None
80     parent_place_id: Optional[int] = None
81     linked_place_id: Optional[int] = None
82     osm_object: Optional[Tuple[str, int]] = None
83     admin_level: int = 15
84
85     names: Optional[Dict[str, str]] = None
86     address: Optional[Dict[str, str]] = None
87     extratags: Optional[Dict[str, str]] = None
88
89     housenumber: Optional[str] = None
90     postcode: Optional[str] = None
91     wikipedia: Optional[str] = None
92
93     rank_address: int = 30
94     rank_search: int = 30
95     importance: Optional[float] = None
96
97     country_code: Optional[str] = None
98
99     indexed_date: Optional[dt.datetime] = None
100
101     address_rows: Optional[AddressLines] = None
102     linked_rows: Optional[AddressLines] = None
103     parented_rows: Optional[AddressLines] = None
104     name_keywords: Optional[WordInfos] = None
105     address_keywords: Optional[WordInfos] = None
106
107     geometry: Dict[str, str] = dataclasses.field(default_factory=dict)
108
109     def __post_init__(self) -> None:
110         if self.indexed_date is not None and self.indexed_date.tzinfo is None:
111             self.indexed_date = self.indexed_date.replace(tzinfo=dt.timezone.utc)
112
113     @property
114     def lat(self) -> float:
115         """ Get the latitude (or y) of the center point of the place.
116         """
117         return self.centroid[1]
118
119
120     @property
121     def lon(self) -> float:
122         """ Get the longitude (or x) of the center point of the place.
123         """
124         return self.centroid[0]
125
126
127     def calculated_importance(self) -> float:
128         """ Get a valid importance value. This is either the stored importance
129             of the value or an artificial value computed from the place's
130             search rank.
131         """
132         return self.importance or (0.7500001 - (self.rank_search/40.0))
133
134
135 def _filter_geometries(row: SaRow) -> Dict[str, str]:
136     return {k[9:]: v for k, v in row._mapping.items() # pylint: disable=W0212
137             if k.startswith('geometry_')}
138
139
140 def create_from_placex_row(row: SaRow) -> SearchResult:
141     """ Construct a new SearchResult and add the data from the result row
142         from the placex table.
143     """
144     return SearchResult(source_table=SourceTable.PLACEX,
145                         place_id=row.place_id,
146                         parent_place_id=row.parent_place_id,
147                         linked_place_id=row.linked_place_id,
148                         osm_object=(row.osm_type, row.osm_id),
149                         category=(row.class_, row.type),
150                         admin_level=row.admin_level,
151                         names=row.name,
152                         address=row.address,
153                         extratags=row.extratags,
154                         housenumber=row.housenumber,
155                         postcode=row.postcode,
156                         wikipedia=row.wikipedia,
157                         rank_address=row.rank_address,
158                         rank_search=row.rank_search,
159                         importance=row.importance,
160                         country_code=row.country_code,
161                         indexed_date=getattr(row, 'indexed_date'),
162                         centroid=Point.from_wkb(row.centroid.data),
163                         geometry=_filter_geometries(row))
164
165
166 def create_from_osmline_row(row: SaRow) -> SearchResult:
167     """ Construct a new SearchResult and add the data from the result row
168         from the osmline table.
169     """
170     return SearchResult(source_table=SourceTable.OSMLINE,
171                         place_id=row.place_id,
172                         parent_place_id=row.parent_place_id,
173                         osm_object=('W', row.osm_id),
174                         category=('place', 'houses'),
175                         address=row.address,
176                         postcode=row.postcode,
177                         extratags={'startnumber': str(row.startnumber),
178                                    'endnumber': str(row.endnumber),
179                                    'step': str(row.step)},
180                         country_code=row.country_code,
181                         indexed_date=getattr(row, 'indexed_date'),
182                         centroid=Point.from_wkb(row.centroid.data),
183                         geometry=_filter_geometries(row))
184
185
186 def create_from_tiger_row(row: SaRow) -> SearchResult:
187     """ Construct a new SearchResult and add the data from the result row
188         from the Tiger table.
189     """
190     return SearchResult(source_table=SourceTable.TIGER,
191                         place_id=row.place_id,
192                         parent_place_id=row.parent_place_id,
193                         category=('place', 'houses'),
194                         postcode=row.postcode,
195                         extratags={'startnumber': str(row.startnumber),
196                                    'endnumber': str(row.endnumber),
197                                    'step': str(row.step)},
198                         country_code='us',
199                         centroid=Point.from_wkb(row.centroid.data),
200                         geometry=_filter_geometries(row))
201
202
203 def create_from_postcode_row(row: SaRow) -> SearchResult:
204     """ Construct a new SearchResult and add the data from the result row
205         from the postcode centroid table.
206     """
207     return SearchResult(source_table=SourceTable.POSTCODE,
208                         place_id=row.place_id,
209                         parent_place_id=row.parent_place_id,
210                         category=('place', 'postcode'),
211                         names={'ref': row.postcode},
212                         rank_search=row.rank_search,
213                         rank_address=row.rank_address,
214                         country_code=row.country_code,
215                         centroid=Point.from_wkb(row.centroid.data),
216                         indexed_date=row.indexed_date,
217                         geometry=_filter_geometries(row))
218
219
220 async def add_result_details(conn: SearchConnection, result: SearchResult,
221                              details: LookupDetails) -> None:
222     """ Retrieve more details from the database according to the
223         parameters specified in 'details'.
224     """
225     log().section('Query details for result')
226     if details.address_details:
227         log().comment('Query address details')
228         await complete_address_details(conn, result)
229     if details.linked_places:
230         log().comment('Query linked places')
231         await complete_linked_places(conn, result)
232     if details.parented_places:
233         log().comment('Query parent places')
234         await complete_parented_places(conn, result)
235     if details.keywords:
236         log().comment('Query keywords')
237         await complete_keywords(conn, result)
238
239
240 def _result_row_to_address_row(row: SaRow) -> AddressLine:
241     """ Create a new AddressLine from the results of a datbase query.
242     """
243     extratags: Dict[str, str] = getattr(row, 'extratags', {})
244     if 'place_type' in row:
245         extratags['place_type'] = row.place_type
246
247     names = row.name
248     if getattr(row, 'housenumber', None) is not None:
249         if names is None:
250             names = {}
251         names['housenumber'] = row.housenumber
252
253     return AddressLine(place_id=row.place_id,
254                        osm_object=None if row.osm_type is None else (row.osm_type, row.osm_id),
255                        category=(getattr(row, 'class'), row.type),
256                        names=names,
257                        extratags=extratags,
258                        admin_level=row.admin_level,
259                        fromarea=row.fromarea,
260                        isaddress=getattr(row, 'isaddress', True),
261                        rank_address=row.rank_address,
262                        distance=row.distance)
263
264
265 async def complete_address_details(conn: SearchConnection, result: SearchResult) -> None:
266     """ Retrieve information about places that make up the address of the result.
267     """
268     housenumber = -1
269     if result.source_table in (SourceTable.TIGER, SourceTable.OSMLINE):
270         if result.housenumber is not None:
271             housenumber = int(result.housenumber)
272         elif result.extratags is not None and 'startnumber' in result.extratags:
273             # details requests do not come with a specific house number
274             housenumber = int(result.extratags['startnumber'])
275
276     sfn = sa.func.get_addressdata(result.place_id, housenumber)\
277             .table_valued( # type: ignore[no-untyped-call]
278                 sa.column('place_id', type_=sa.Integer),
279                 'osm_type',
280                 sa.column('osm_id', type_=sa.BigInteger),
281                 sa.column('name', type_=conn.t.types.Composite),
282                 'class', 'type', 'place_type',
283                 sa.column('admin_level', type_=sa.Integer),
284                 sa.column('fromarea', type_=sa.Boolean),
285                 sa.column('isaddress', type_=sa.Boolean),
286                 sa.column('rank_address', type_=sa.SmallInteger),
287                 sa.column('distance', type_=sa.Float))
288     sql = sa.select(sfn).order_by(sa.column('rank_address').desc(),
289                                   sa.column('isaddress').desc())
290
291     result.address_rows = []
292     for row in await conn.execute(sql):
293         result.address_rows.append(_result_row_to_address_row(row))
294
295 # pylint: disable=consider-using-f-string
296 def _placex_select_address_row(conn: SearchConnection,
297                                centroid: Point) -> SaSelect:
298     t = conn.t.placex
299     return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
300                      t.c.class_.label('class'), t.c.type,
301                      t.c.admin_level, t.c.housenumber,
302                      sa.literal_column("""ST_GeometryType(geometry) in
303                                         ('ST_Polygon','ST_MultiPolygon')""").label('fromarea'),
304                      t.c.rank_address,
305                      sa.literal_column(
306                          """ST_DistanceSpheroid(geometry, 'SRID=4326;POINT(%f %f)'::geometry,
307                               'SPHEROID["WGS 84",6378137,298.257223563, AUTHORITY["EPSG","7030"]]')
308                          """ % centroid).label('distance'))
309
310
311 async def complete_linked_places(conn: SearchConnection, result: SearchResult) -> None:
312     """ Retrieve information about places that link to the result.
313     """
314     result.linked_rows = []
315     if result.source_table != SourceTable.PLACEX:
316         return
317
318     sql = _placex_select_address_row(conn, result.centroid)\
319             .where(conn.t.placex.c.linked_place_id == result.place_id)
320
321     for row in await conn.execute(sql):
322         result.linked_rows.append(_result_row_to_address_row(row))
323
324
325 async def complete_keywords(conn: SearchConnection, result: SearchResult) -> None:
326     """ Retrieve information about the search terms used for this place.
327     """
328     t = conn.t.search_name
329     sql = sa.select(t.c.name_vector, t.c.nameaddress_vector)\
330             .where(t.c.place_id == result.place_id)
331
332     result.name_keywords = []
333     result.address_keywords = []
334     for name_tokens, address_tokens in await conn.execute(sql):
335         t = conn.t.word
336         sel = sa.select(t.c.word_id, t.c.word_token, t.c.word)
337
338         for row in await conn.execute(sel.where(t.c.word_id == sa.any_(name_tokens))):
339             result.name_keywords.append(WordInfo(*row))
340
341         for row in await conn.execute(sel.where(t.c.word_id == sa.any_(address_tokens))):
342             result.address_keywords.append(WordInfo(*row))
343
344
345 async def complete_parented_places(conn: SearchConnection, result: SearchResult) -> None:
346     """ Retrieve information about places that the result provides the
347         address for.
348     """
349     result.parented_rows = []
350     if result.source_table != SourceTable.PLACEX:
351         return
352
353     sql = _placex_select_address_row(conn, result.centroid)\
354             .where(conn.t.placex.c.parent_place_id == result.place_id)\
355             .where(conn.t.placex.c.rank_search == 30)
356
357     for row in await conn.execute(sql):
358         result.parented_rows.append(_result_row_to_address_row(row))