]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/types.py
implement search builder
[nominatim.git] / nominatim / api / types.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Complex datatypes used by the Nominatim API.
9 """
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
11                    Any, List, Sequence
12 from collections import abc
13 import dataclasses
14 import enum
15 import math
16 from struct import unpack
17
18 from nominatim.errors import UsageError
19
20 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
21
22 @dataclasses.dataclass
23 class PlaceID:
24     """ Reference an object by Nominatim's internal ID.
25     """
26     place_id: int
27
28
29 @dataclasses.dataclass
30 class OsmID:
31     """ Reference by the OSM ID and potentially the basic category.
32     """
33     osm_type: str
34     osm_id: int
35     osm_class: Optional[str] = None
36
37     def __post_init__(self) -> None:
38         if self.osm_type not in ('N', 'W', 'R'):
39             raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
40
41
42 PlaceRef = Union[PlaceID, OsmID]
43
44
45 class Point(NamedTuple):
46     """ A geographic point in WGS84 projection.
47     """
48     x: float
49     y: float
50
51
52     @property
53     def lat(self) -> float:
54         """ Return the latitude of the point.
55         """
56         return self.y
57
58
59     @property
60     def lon(self) -> float:
61         """ Return the longitude of the point.
62         """
63         return self.x
64
65
66     def to_geojson(self) -> str:
67         """ Return the point in GeoJSON format.
68         """
69         return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
70
71
72     @staticmethod
73     def from_wkb(wkb: bytes) -> 'Point':
74         """ Create a point from EWKB as returned from the database.
75         """
76         if len(wkb) != 25:
77             raise ValueError("Point wkb has unexpected length")
78         if wkb[0] == 0:
79             gtype, srid, x, y = unpack('>iidd', wkb[1:])
80         elif wkb[0] == 1:
81             gtype, srid, x, y = unpack('<iidd', wkb[1:])
82         else:
83             raise ValueError("WKB has unknown endian value.")
84
85         if gtype != 0x20000001:
86             raise ValueError("WKB must be a point geometry.")
87         if srid != 4326:
88             raise ValueError("Only WGS84 WKB supported.")
89
90         return Point(x, y)
91
92
93     @staticmethod
94     def from_param(inp: Any) -> 'Point':
95         """ Create a point from an input parameter. The parameter
96             may be given as a point, a string or a sequence of
97             strings or floats. Raises a UsageError if the format is
98             not correct.
99         """
100         if isinstance(inp, Point):
101             return inp
102
103         seq: Sequence[str]
104         if isinstance(inp, str):
105             seq = inp.split(',')
106         elif isinstance(inp, abc.Sequence):
107             seq = inp
108
109         if len(seq) != 2:
110             raise UsageError('Point parameter needs 2 coordinates.')
111         try:
112             x, y = filter(math.isfinite, map(float, seq))
113         except ValueError as exc:
114             raise UsageError('Point parameter needs to be numbers.') from exc
115
116         if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
117             raise UsageError('Point coordinates invalid.')
118
119         return Point(x, y)
120
121
122
123 AnyPoint = Union[Point, Tuple[float, float]]
124
125 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
126 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
127
128 class Bbox:
129     """ A bounding box in WSG84 projection.
130
131         The coordinates are available as an array in the 'coord'
132         property in the order (minx, miny, maxx, maxy).
133     """
134     def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
135         self.coords = (minx, miny, maxx, maxy)
136
137
138     @property
139     def minlat(self) -> float:
140         """ Southern-most latitude, corresponding to the minimum y coordinate.
141         """
142         return self.coords[1]
143
144
145     @property
146     def maxlat(self) -> float:
147         """ Northern-most latitude, corresponding to the maximum y coordinate.
148         """
149         return self.coords[3]
150
151
152     @property
153     def minlon(self) -> float:
154         """ Western-most longitude, corresponding to the minimum x coordinate.
155         """
156         return self.coords[0]
157
158
159     @property
160     def maxlon(self) -> float:
161         """ Eastern-most longitude, corresponding to the maximum x coordinate.
162         """
163         return self.coords[2]
164
165
166     def contains(self, pt: Point) -> bool:
167         """ Check if the point is inside or on the boundary of the box.
168         """
169         return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
170                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
171
172     @staticmethod
173     def from_wkb(wkb: Optional[bytes]) -> 'Optional[Bbox]':
174         """ Create a Bbox from a bounding box polygon as returned by
175             the database. Return s None if the input value is None.
176         """
177         if wkb is None:
178             return None
179
180         if len(wkb) != 97:
181             raise ValueError("WKB must be a bounding box polygon")
182         if wkb.startswith(WKB_BBOX_HEADER_LE):
183             x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
184         elif wkb.startswith(WKB_BBOX_HEADER_BE):
185             x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
186         else:
187             raise ValueError("WKB has wrong header")
188
189         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
190
191
192     @staticmethod
193     def from_point(pt: Point, buffer: float) -> 'Bbox':
194         """ Return a Bbox around the point with the buffer added to all sides.
195         """
196         return Bbox(pt[0] - buffer, pt[1] - buffer,
197                     pt[0] + buffer, pt[1] + buffer)
198
199
200     @staticmethod
201     def from_param(inp: Any) -> 'Bbox':
202         """ Return a Bbox from an input parameter. The box may be
203             given as a Bbox, a string or a list or strings or integer.
204             Raises a UsageError if the format is incorrect.
205         """
206         if isinstance(inp, Bbox):
207             return inp
208
209         seq: Sequence[str]
210         if isinstance(inp, str):
211             seq = inp.split(',')
212         elif isinstance(inp, abc.Sequence):
213             seq = inp
214
215         if len(seq) != 4:
216             raise UsageError('Bounding box parameter needs 4 coordinates.')
217         try:
218             x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
219         except ValueError as exc:
220             raise UsageError('Bounding box parameter needs to be numbers.') from exc
221
222         if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
223            or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
224             raise UsageError('Bounding box coordinates invalid.')
225
226         if x1 == x2 or y1 == y2:
227             raise UsageError('Bounding box with invalid parameters.')
228
229         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
230
231
232 class GeometryFormat(enum.Flag):
233     """ Geometry output formats supported by Nominatim.
234     """
235     NONE = 0
236     GEOJSON = enum.auto()
237     KML = enum.auto()
238     SVG = enum.auto()
239     TEXT = enum.auto()
240
241
242 class DataLayer(enum.Flag):
243     """ Layer types that can be selected for reverse and forward search.
244     """
245     POI = enum.auto()
246     ADDRESS = enum.auto()
247     RAILWAY = enum.auto()
248     MANMADE = enum.auto()
249     NATURAL = enum.auto()
250
251
252 def format_country(cc: Any) -> List[str]:
253     """ Extract a list of country codes from the input which may be either
254         a string or list of strings. Filters out all values that are not
255         a two-letter string.
256     """
257     clist: Sequence[str]
258     if isinstance(cc, str):
259         clist = cc.split(',')
260     elif isinstance(cc, abc.Sequence):
261         clist = cc
262     else:
263         raise UsageError("Parameter 'country' needs to be a comma-separated list "
264                          "or a Python list of strings.")
265
266     return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
267
268
269 def format_excluded(ids: Any) -> List[int]:
270     """ Extract a list of place ids from the input which may be either
271         a string or a list of strings or ints. Ignores empty value but
272         throws a UserError on anything that cannot be converted to int.
273     """
274     plist: Sequence[str]
275     if isinstance(ids, str):
276         plist = ids.split(',')
277     elif isinstance(ids, abc.Sequence):
278         plist = ids
279     else:
280         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
281                          "or a Python list of numbers.")
282     if any(not isinstance(i, int) or (isinstance(i, str) and not i.isdigit()) for i in plist):
283         raise UsageError("Parameter 'excluded' only takes place IDs.")
284
285     return [int(id) for id in plist if id]
286
287
288 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
289     """ Extract a list of categories. Currently a noop.
290     """
291     return categories
292
293 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
294
295 @dataclasses.dataclass
296 class LookupDetails:
297     """ Collection of parameters that define the amount of details
298         returned with a lookup or details result.
299     """
300     geometry_output: GeometryFormat = GeometryFormat.NONE
301     """ Add the full geometry of the place to the result. Multiple
302         formats may be selected. Note that geometries can become quite large.
303     """
304     address_details: bool = False
305     """ Get detailed information on the places that make up the address
306         for the result.
307     """
308     linked_places: bool = False
309     """ Get detailed information on the places that link to the result.
310     """
311     parented_places: bool = False
312     """ Get detailed information on all places that this place is a parent
313         for, i.e. all places for which it provides the address details.
314         Only POI places can have parents.
315     """
316     keywords: bool = False
317     """ Add information about the search terms used for this place.
318     """
319     geometry_simplification: float = 0.0
320     """ Simplification factor for a geometry in degrees WGS. A factor of
321         0.0 means the original geometry is kept. The higher the value, the
322         more the geometry gets simplified.
323     """
324
325     @classmethod
326     def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
327         """ Load the data fields of the class from a dictionary.
328             Unknown entries in the dictionary are ignored, missing ones
329             get the default setting.
330
331             The function supports type checking and throws a UsageError
332             when the value does not fit.
333         """
334         def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
335             if v is None:
336                 return field.default_factory() \
337                        if field.default_factory != dataclasses.MISSING \
338                        else field.default
339             if field.metadata and 'transform' in field.metadata:
340                 return field.metadata['transform'](v)
341             if not isinstance(v, field.type):
342                 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
343             return v
344
345         return cls(**{f.name: _check_field(kwargs[f.name], f)
346                       for f in dataclasses.fields(cls) if f.name in kwargs})
347
348
349 @dataclasses.dataclass
350 class ReverseDetails(LookupDetails):
351     """ Collection of parameters for the reverse call.
352     """
353     max_rank: int = dataclasses.field(default=30,
354                                       metadata={'transform': lambda v: max(0, min(v, 30))}
355                                      )
356     """ Highest address rank to return.
357     """
358     layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
359     """ Filter which kind of data to include.
360     """
361
362 @dataclasses.dataclass
363 class SearchDetails(LookupDetails):
364     """ Collection of parameters for the search call.
365     """
366     max_results: int = 10
367     """ Maximum number of results to be returned. The actual number of results
368         may be less.
369     """
370     min_rank: int = dataclasses.field(default=0,
371                                       metadata={'transform': lambda v: max(0, min(v, 30))}
372                                      )
373     """ Lowest address rank to return.
374     """
375     max_rank: int = dataclasses.field(default=30,
376                                       metadata={'transform': lambda v: max(0, min(v, 30))}
377                                      )
378     """ Highest address rank to return.
379     """
380     layers: Optional[DataLayer] = None
381     """ Filter which kind of data to include. When 'None' (the default) then
382         filtering by layers is disabled.
383     """
384     countries: List[str] = dataclasses.field(default_factory=list,
385                                              metadata={'transform': format_country})
386     """ Restrict search results to the given countries. An empty list (the
387         default) will disable this filter.
388     """
389     excluded: List[int] = dataclasses.field(default_factory=list,
390                                             metadata={'transform': format_excluded})
391     """ List of OSM objects to exclude from the results. Currenlty only
392         works when the internal place ID is given.
393         An empty list (the default) will disable this filter.
394     """
395     viewbox: Optional[Bbox] = dataclasses.field(default=None,
396                                                 metadata={'transform': Bbox.from_param})
397     """ Focus the search on a given map area.
398     """
399     bounded_viewbox: bool = False
400     """ Use 'viewbox' as a filter and restrict results to places within the
401         given area.
402     """
403     near: Optional[Point] = dataclasses.field(default=None,
404                                               metadata={'transform': Point.from_param})
405     """ Order results by distance to the given point.
406     """
407     near_radius: Optional[float] = None
408     """ Use near point as a filter and drop results outside the given
409         radius. Radius is given in degrees WSG84.
410     """
411     categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
412                                                           metadata={'transform': format_categories})
413     """ Restrict search to places with one of the given class/type categories.
414         An empty list (the default) will disable this filter.
415     """
416
417     def __post_init__(self) -> None:
418         if self.viewbox is not None:
419             xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
420             yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
421             self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.maxlon - yext,
422                                    self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
423
424
425     def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
426         """ Change the min_rank and max_rank fields to respect the
427             given boundaries.
428         """
429         assert new_min <= new_max
430         self.min_rank = max(self.min_rank, new_min)
431         self.max_rank = min(self.max_rank, new_max)
432
433
434     def is_impossible(self) -> bool:
435         """ Check if the parameter configuration is contradictionary and
436             cannot yield any results.
437         """
438         return (self.min_rank > self.max_rank
439                 or (self.bounded_viewbox
440                     and self.viewbox is not None and self.near is not None
441                     and self.viewbox.contains(self.near))
442                 or self.layers is not None and not self.layers)
443
444
445     def layer_enabled(self, layer: DataLayer) -> bool:
446         """ Check if the given layer has been choosen. Also returns
447             true when layer restriction has been disabled completely.
448         """
449         return self.layers is None or bool(self.layers & layer)