]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/types.py
implement actual database searches
[nominatim.git] / nominatim / api / types.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Complex datatypes used by the Nominatim API.
9 """
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
11                    Any, List, Sequence
12 from collections import abc
13 import dataclasses
14 import enum
15 import math
16 from struct import unpack
17
18 from geoalchemy2 import WKTElement
19 import geoalchemy2.functions
20
21 from nominatim.errors import UsageError
22
23 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
24
25 @dataclasses.dataclass
26 class PlaceID:
27     """ Reference an object by Nominatim's internal ID.
28     """
29     place_id: int
30
31
32 @dataclasses.dataclass
33 class OsmID:
34     """ Reference by the OSM ID and potentially the basic category.
35     """
36     osm_type: str
37     osm_id: int
38     osm_class: Optional[str] = None
39
40     def __post_init__(self) -> None:
41         if self.osm_type not in ('N', 'W', 'R'):
42             raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
43
44
45 PlaceRef = Union[PlaceID, OsmID]
46
47
48 class Point(NamedTuple):
49     """ A geographic point in WGS84 projection.
50     """
51     x: float
52     y: float
53
54
55     @property
56     def lat(self) -> float:
57         """ Return the latitude of the point.
58         """
59         return self.y
60
61
62     @property
63     def lon(self) -> float:
64         """ Return the longitude of the point.
65         """
66         return self.x
67
68
69     def to_geojson(self) -> str:
70         """ Return the point in GeoJSON format.
71         """
72         return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
73
74
75     @staticmethod
76     def from_wkb(wkb: bytes) -> 'Point':
77         """ Create a point from EWKB as returned from the database.
78         """
79         if len(wkb) != 25:
80             raise ValueError("Point wkb has unexpected length")
81         if wkb[0] == 0:
82             gtype, srid, x, y = unpack('>iidd', wkb[1:])
83         elif wkb[0] == 1:
84             gtype, srid, x, y = unpack('<iidd', wkb[1:])
85         else:
86             raise ValueError("WKB has unknown endian value.")
87
88         if gtype != 0x20000001:
89             raise ValueError("WKB must be a point geometry.")
90         if srid != 4326:
91             raise ValueError("Only WGS84 WKB supported.")
92
93         return Point(x, y)
94
95
96     @staticmethod
97     def from_param(inp: Any) -> 'Point':
98         """ Create a point from an input parameter. The parameter
99             may be given as a point, a string or a sequence of
100             strings or floats. Raises a UsageError if the format is
101             not correct.
102         """
103         if isinstance(inp, Point):
104             return inp
105
106         seq: Sequence[str]
107         if isinstance(inp, str):
108             seq = inp.split(',')
109         elif isinstance(inp, abc.Sequence):
110             seq = inp
111
112         if len(seq) != 2:
113             raise UsageError('Point parameter needs 2 coordinates.')
114         try:
115             x, y = filter(math.isfinite, map(float, seq))
116         except ValueError as exc:
117             raise UsageError('Point parameter needs to be numbers.') from exc
118
119         if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
120             raise UsageError('Point coordinates invalid.')
121
122         return Point(x, y)
123
124
125     def sql_value(self) -> WKTElement:
126         """ Create an SQL expression for the point.
127         """
128         return WKTElement(f'POINT({self.x} {self.y})', srid=4326)
129
130
131
132 AnyPoint = Union[Point, Tuple[float, float]]
133
134 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
135 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
136
137 class Bbox:
138     """ A bounding box in WSG84 projection.
139
140         The coordinates are available as an array in the 'coord'
141         property in the order (minx, miny, maxx, maxy).
142     """
143     def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
144         self.coords = (minx, miny, maxx, maxy)
145
146
147     @property
148     def minlat(self) -> float:
149         """ Southern-most latitude, corresponding to the minimum y coordinate.
150         """
151         return self.coords[1]
152
153
154     @property
155     def maxlat(self) -> float:
156         """ Northern-most latitude, corresponding to the maximum y coordinate.
157         """
158         return self.coords[3]
159
160
161     @property
162     def minlon(self) -> float:
163         """ Western-most longitude, corresponding to the minimum x coordinate.
164         """
165         return self.coords[0]
166
167
168     @property
169     def maxlon(self) -> float:
170         """ Eastern-most longitude, corresponding to the maximum x coordinate.
171         """
172         return self.coords[2]
173
174
175     @property
176     def area(self) -> float:
177         """ Return the area of the box in WGS84.
178         """
179         return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
180
181
182     def sql_value(self) -> Any:
183         """ Create an SQL expression for the box.
184         """
185         return geoalchemy2.functions.ST_MakeEnvelope(*self.coords, 4326)
186
187
188     def contains(self, pt: Point) -> bool:
189         """ Check if the point is inside or on the boundary of the box.
190         """
191         return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
192                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
193
194
195     @staticmethod
196     def from_wkb(wkb: Optional[bytes]) -> 'Optional[Bbox]':
197         """ Create a Bbox from a bounding box polygon as returned by
198             the database. Return s None if the input value is None.
199         """
200         if wkb is None:
201             return None
202
203         if len(wkb) != 97:
204             raise ValueError("WKB must be a bounding box polygon")
205         if wkb.startswith(WKB_BBOX_HEADER_LE):
206             x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
207         elif wkb.startswith(WKB_BBOX_HEADER_BE):
208             x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
209         else:
210             raise ValueError("WKB has wrong header")
211
212         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
213
214
215     @staticmethod
216     def from_point(pt: Point, buffer: float) -> 'Bbox':
217         """ Return a Bbox around the point with the buffer added to all sides.
218         """
219         return Bbox(pt[0] - buffer, pt[1] - buffer,
220                     pt[0] + buffer, pt[1] + buffer)
221
222
223     @staticmethod
224     def from_param(inp: Any) -> 'Bbox':
225         """ Return a Bbox from an input parameter. The box may be
226             given as a Bbox, a string or a list or strings or integer.
227             Raises a UsageError if the format is incorrect.
228         """
229         if isinstance(inp, Bbox):
230             return inp
231
232         seq: Sequence[str]
233         if isinstance(inp, str):
234             seq = inp.split(',')
235         elif isinstance(inp, abc.Sequence):
236             seq = inp
237
238         if len(seq) != 4:
239             raise UsageError('Bounding box parameter needs 4 coordinates.')
240         try:
241             x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
242         except ValueError as exc:
243             raise UsageError('Bounding box parameter needs to be numbers.') from exc
244
245         if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
246            or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
247             raise UsageError('Bounding box coordinates invalid.')
248
249         if x1 == x2 or y1 == y2:
250             raise UsageError('Bounding box with invalid parameters.')
251
252         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
253
254
255 class GeometryFormat(enum.Flag):
256     """ Geometry output formats supported by Nominatim.
257     """
258     NONE = 0
259     GEOJSON = enum.auto()
260     KML = enum.auto()
261     SVG = enum.auto()
262     TEXT = enum.auto()
263
264
265 class DataLayer(enum.Flag):
266     """ Layer types that can be selected for reverse and forward search.
267     """
268     POI = enum.auto()
269     ADDRESS = enum.auto()
270     RAILWAY = enum.auto()
271     MANMADE = enum.auto()
272     NATURAL = enum.auto()
273
274
275 def format_country(cc: Any) -> List[str]:
276     """ Extract a list of country codes from the input which may be either
277         a string or list of strings. Filters out all values that are not
278         a two-letter string.
279     """
280     clist: Sequence[str]
281     if isinstance(cc, str):
282         clist = cc.split(',')
283     elif isinstance(cc, abc.Sequence):
284         clist = cc
285     else:
286         raise UsageError("Parameter 'country' needs to be a comma-separated list "
287                          "or a Python list of strings.")
288
289     return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
290
291
292 def format_excluded(ids: Any) -> List[int]:
293     """ Extract a list of place ids from the input which may be either
294         a string or a list of strings or ints. Ignores empty value but
295         throws a UserError on anything that cannot be converted to int.
296     """
297     plist: Sequence[str]
298     if isinstance(ids, str):
299         plist = ids.split(',')
300     elif isinstance(ids, abc.Sequence):
301         plist = ids
302     else:
303         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
304                          "or a Python list of numbers.")
305     if any(not isinstance(i, int) or (isinstance(i, str) and not i.isdigit()) for i in plist):
306         raise UsageError("Parameter 'excluded' only takes place IDs.")
307
308     return [int(id) for id in plist if id]
309
310
311 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
312     """ Extract a list of categories. Currently a noop.
313     """
314     return categories
315
316 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
317
318 @dataclasses.dataclass
319 class LookupDetails:
320     """ Collection of parameters that define the amount of details
321         returned with a lookup or details result.
322     """
323     geometry_output: GeometryFormat = GeometryFormat.NONE
324     """ Add the full geometry of the place to the result. Multiple
325         formats may be selected. Note that geometries can become quite large.
326     """
327     address_details: bool = False
328     """ Get detailed information on the places that make up the address
329         for the result.
330     """
331     linked_places: bool = False
332     """ Get detailed information on the places that link to the result.
333     """
334     parented_places: bool = False
335     """ Get detailed information on all places that this place is a parent
336         for, i.e. all places for which it provides the address details.
337         Only POI places can have parents.
338     """
339     keywords: bool = False
340     """ Add information about the search terms used for this place.
341     """
342     geometry_simplification: float = 0.0
343     """ Simplification factor for a geometry in degrees WGS. A factor of
344         0.0 means the original geometry is kept. The higher the value, the
345         more the geometry gets simplified.
346     """
347
348     @classmethod
349     def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
350         """ Load the data fields of the class from a dictionary.
351             Unknown entries in the dictionary are ignored, missing ones
352             get the default setting.
353
354             The function supports type checking and throws a UsageError
355             when the value does not fit.
356         """
357         def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
358             if v is None:
359                 return field.default_factory() \
360                        if field.default_factory != dataclasses.MISSING \
361                        else field.default
362             if field.metadata and 'transform' in field.metadata:
363                 return field.metadata['transform'](v)
364             if not isinstance(v, field.type):
365                 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
366             return v
367
368         return cls(**{f.name: _check_field(kwargs[f.name], f)
369                       for f in dataclasses.fields(cls) if f.name in kwargs})
370
371
372 @dataclasses.dataclass
373 class ReverseDetails(LookupDetails):
374     """ Collection of parameters for the reverse call.
375     """
376     max_rank: int = dataclasses.field(default=30,
377                                       metadata={'transform': lambda v: max(0, min(v, 30))}
378                                      )
379     """ Highest address rank to return.
380     """
381     layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
382     """ Filter which kind of data to include.
383     """
384
385 @dataclasses.dataclass
386 class SearchDetails(LookupDetails):
387     """ Collection of parameters for the search call.
388     """
389     max_results: int = 10
390     """ Maximum number of results to be returned. The actual number of results
391         may be less.
392     """
393     min_rank: int = dataclasses.field(default=0,
394                                       metadata={'transform': lambda v: max(0, min(v, 30))}
395                                      )
396     """ Lowest address rank to return.
397     """
398     max_rank: int = dataclasses.field(default=30,
399                                       metadata={'transform': lambda v: max(0, min(v, 30))}
400                                      )
401     """ Highest address rank to return.
402     """
403     layers: Optional[DataLayer] = None
404     """ Filter which kind of data to include. When 'None' (the default) then
405         filtering by layers is disabled.
406     """
407     countries: List[str] = dataclasses.field(default_factory=list,
408                                              metadata={'transform': format_country})
409     """ Restrict search results to the given countries. An empty list (the
410         default) will disable this filter.
411     """
412     excluded: List[int] = dataclasses.field(default_factory=list,
413                                             metadata={'transform': format_excluded})
414     """ List of OSM objects to exclude from the results. Currenlty only
415         works when the internal place ID is given.
416         An empty list (the default) will disable this filter.
417     """
418     viewbox: Optional[Bbox] = dataclasses.field(default=None,
419                                                 metadata={'transform': Bbox.from_param})
420     """ Focus the search on a given map area.
421     """
422     bounded_viewbox: bool = False
423     """ Use 'viewbox' as a filter and restrict results to places within the
424         given area.
425     """
426     near: Optional[Point] = dataclasses.field(default=None,
427                                               metadata={'transform': Point.from_param})
428     """ Order results by distance to the given point.
429     """
430     near_radius: Optional[float] = None
431     """ Use near point as a filter and drop results outside the given
432         radius. Radius is given in degrees WSG84.
433     """
434     categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
435                                                           metadata={'transform': format_categories})
436     """ Restrict search to places with one of the given class/type categories.
437         An empty list (the default) will disable this filter.
438     """
439
440     def __post_init__(self) -> None:
441         if self.viewbox is not None:
442             xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
443             yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
444             self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
445                                    self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
446
447
448     def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
449         """ Change the min_rank and max_rank fields to respect the
450             given boundaries.
451         """
452         assert new_min <= new_max
453         self.min_rank = max(self.min_rank, new_min)
454         self.max_rank = min(self.max_rank, new_max)
455
456
457     def is_impossible(self) -> bool:
458         """ Check if the parameter configuration is contradictionary and
459             cannot yield any results.
460         """
461         return (self.min_rank > self.max_rank
462                 or (self.bounded_viewbox
463                     and self.viewbox is not None and self.near is not None
464                     and self.viewbox.contains(self.near))
465                 or self.layers is not None and not self.layers)
466
467
468     def layer_enabled(self, layer: DataLayer) -> bool:
469         """ Check if the given layer has been choosen. Also returns
470             true when layer restriction has been disabled completely.
471         """
472         return self.layers is None or bool(self.layers & layer)