]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/types.py
9d19b75dde393e1fe5dc4f43544101945ffab213
[nominatim.git] / nominatim / api / types.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Complex datatypes used by the Nominatim API.
9 """
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
11                    Any, List, Sequence
12 from collections import abc
13 import dataclasses
14 import enum
15 import math
16 from struct import unpack
17
18 import sqlalchemy as sa
19
20 from nominatim.errors import UsageError
21
22 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
23
24 @dataclasses.dataclass
25 class PlaceID:
26     """ Reference an object by Nominatim's internal ID.
27     """
28     place_id: int
29
30
31 @dataclasses.dataclass
32 class OsmID:
33     """ Reference by the OSM ID and potentially the basic category.
34     """
35     osm_type: str
36     osm_id: int
37     osm_class: Optional[str] = None
38
39     def __post_init__(self) -> None:
40         if self.osm_type not in ('N', 'W', 'R'):
41             raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
42
43
44 PlaceRef = Union[PlaceID, OsmID]
45
46
47 class Point(NamedTuple):
48     """ A geographic point in WGS84 projection.
49     """
50     x: float
51     y: float
52
53
54     @property
55     def lat(self) -> float:
56         """ Return the latitude of the point.
57         """
58         return self.y
59
60
61     @property
62     def lon(self) -> float:
63         """ Return the longitude of the point.
64         """
65         return self.x
66
67
68     def to_geojson(self) -> str:
69         """ Return the point in GeoJSON format.
70         """
71         return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
72
73
74     @staticmethod
75     def from_wkb(wkb: bytes) -> 'Point':
76         """ Create a point from EWKB as returned from the database.
77         """
78         if len(wkb) != 25:
79             raise ValueError("Point wkb has unexpected length")
80         if wkb[0] == 0:
81             gtype, srid, x, y = unpack('>iidd', wkb[1:])
82         elif wkb[0] == 1:
83             gtype, srid, x, y = unpack('<iidd', wkb[1:])
84         else:
85             raise ValueError("WKB has unknown endian value.")
86
87         if gtype != 0x20000001:
88             raise ValueError("WKB must be a point geometry.")
89         if srid != 4326:
90             raise ValueError("Only WGS84 WKB supported.")
91
92         return Point(x, y)
93
94
95     @staticmethod
96     def from_param(inp: Any) -> 'Point':
97         """ Create a point from an input parameter. The parameter
98             may be given as a point, a string or a sequence of
99             strings or floats. Raises a UsageError if the format is
100             not correct.
101         """
102         if isinstance(inp, Point):
103             return inp
104
105         seq: Sequence[str]
106         if isinstance(inp, str):
107             seq = inp.split(',')
108         elif isinstance(inp, abc.Sequence):
109             seq = inp
110
111         if len(seq) != 2:
112             raise UsageError('Point parameter needs 2 coordinates.')
113         try:
114             x, y = filter(math.isfinite, map(float, seq))
115         except ValueError as exc:
116             raise UsageError('Point parameter needs to be numbers.') from exc
117
118         if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
119             raise UsageError('Point coordinates invalid.')
120
121         return Point(x, y)
122
123
124     def sql_value(self) -> str:
125         """ Create an SQL expression for the point.
126         """
127         return f'POINT({self.x} {self.y})'
128
129
130
131 AnyPoint = Union[Point, Tuple[float, float]]
132
133 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
134 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
135
136 class Bbox:
137     """ A bounding box in WSG84 projection.
138
139         The coordinates are available as an array in the 'coord'
140         property in the order (minx, miny, maxx, maxy).
141     """
142     def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
143         self.coords = (minx, miny, maxx, maxy)
144
145
146     @property
147     def minlat(self) -> float:
148         """ Southern-most latitude, corresponding to the minimum y coordinate.
149         """
150         return self.coords[1]
151
152
153     @property
154     def maxlat(self) -> float:
155         """ Northern-most latitude, corresponding to the maximum y coordinate.
156         """
157         return self.coords[3]
158
159
160     @property
161     def minlon(self) -> float:
162         """ Western-most longitude, corresponding to the minimum x coordinate.
163         """
164         return self.coords[0]
165
166
167     @property
168     def maxlon(self) -> float:
169         """ Eastern-most longitude, corresponding to the maximum x coordinate.
170         """
171         return self.coords[2]
172
173
174     @property
175     def area(self) -> float:
176         """ Return the area of the box in WGS84.
177         """
178         return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
179
180
181     def sql_value(self) -> Any:
182         """ Create an SQL expression for the box.
183         """
184         return sa.func.ST_MakeEnvelope(*self.coords, 4326)
185
186
187     def contains(self, pt: Point) -> bool:
188         """ Check if the point is inside or on the boundary of the box.
189         """
190         return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
191                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
192
193
194     @staticmethod
195     def from_wkb(wkb: Optional[bytes]) -> 'Optional[Bbox]':
196         """ Create a Bbox from a bounding box polygon as returned by
197             the database. Return s None if the input value is None.
198         """
199         if wkb is None:
200             return None
201
202         if len(wkb) != 97:
203             raise ValueError("WKB must be a bounding box polygon")
204         if wkb.startswith(WKB_BBOX_HEADER_LE):
205             x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
206         elif wkb.startswith(WKB_BBOX_HEADER_BE):
207             x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
208         else:
209             raise ValueError("WKB has wrong header")
210
211         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
212
213
214     @staticmethod
215     def from_point(pt: Point, buffer: float) -> 'Bbox':
216         """ Return a Bbox around the point with the buffer added to all sides.
217         """
218         return Bbox(pt[0] - buffer, pt[1] - buffer,
219                     pt[0] + buffer, pt[1] + buffer)
220
221
222     @staticmethod
223     def from_param(inp: Any) -> 'Bbox':
224         """ Return a Bbox from an input parameter. The box may be
225             given as a Bbox, a string or a list or strings or integer.
226             Raises a UsageError if the format is incorrect.
227         """
228         if isinstance(inp, Bbox):
229             return inp
230
231         seq: Sequence[str]
232         if isinstance(inp, str):
233             seq = inp.split(',')
234         elif isinstance(inp, abc.Sequence):
235             seq = inp
236
237         if len(seq) != 4:
238             raise UsageError('Bounding box parameter needs 4 coordinates.')
239         try:
240             x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
241         except ValueError as exc:
242             raise UsageError('Bounding box parameter needs to be numbers.') from exc
243
244         if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
245            or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
246             raise UsageError('Bounding box coordinates invalid.')
247
248         if x1 == x2 or y1 == y2:
249             raise UsageError('Bounding box with invalid parameters.')
250
251         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
252
253
254 class GeometryFormat(enum.Flag):
255     """ Geometry output formats supported by Nominatim.
256     """
257     NONE = 0
258     GEOJSON = enum.auto()
259     KML = enum.auto()
260     SVG = enum.auto()
261     TEXT = enum.auto()
262
263
264 class DataLayer(enum.Flag):
265     """ Layer types that can be selected for reverse and forward search.
266     """
267     POI = enum.auto()
268     ADDRESS = enum.auto()
269     RAILWAY = enum.auto()
270     MANMADE = enum.auto()
271     NATURAL = enum.auto()
272
273
274 def format_country(cc: Any) -> List[str]:
275     """ Extract a list of country codes from the input which may be either
276         a string or list of strings. Filters out all values that are not
277         a two-letter string.
278     """
279     clist: Sequence[str]
280     if isinstance(cc, str):
281         clist = cc.split(',')
282     elif isinstance(cc, abc.Sequence):
283         clist = cc
284     else:
285         raise UsageError("Parameter 'country' needs to be a comma-separated list "
286                          "or a Python list of strings.")
287
288     return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
289
290
291 def format_excluded(ids: Any) -> List[int]:
292     """ Extract a list of place ids from the input which may be either
293         a string or a list of strings or ints. Ignores empty value but
294         throws a UserError on anything that cannot be converted to int.
295     """
296     plist: Sequence[str]
297     if isinstance(ids, str):
298         plist = [s.strip() for s in ids.split(',')]
299     elif isinstance(ids, abc.Sequence):
300         plist = ids
301     else:
302         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
303                          "or a Python list of numbers.")
304     if not all(isinstance(i, int) or
305                (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
306         raise UsageError("Parameter 'excluded' only takes place IDs.")
307
308     return [int(id) for id in plist if id] or [0]
309
310
311 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
312     """ Extract a list of categories. Currently a noop.
313     """
314     return categories
315
316 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
317
318 @dataclasses.dataclass
319 class LookupDetails:
320     """ Collection of parameters that define the amount of details
321         returned with a lookup or details result.
322     """
323     geometry_output: GeometryFormat = GeometryFormat.NONE
324     """ Add the full geometry of the place to the result. Multiple
325         formats may be selected. Note that geometries can become quite large.
326     """
327     address_details: bool = False
328     """ Get detailed information on the places that make up the address
329         for the result.
330     """
331     linked_places: bool = False
332     """ Get detailed information on the places that link to the result.
333     """
334     parented_places: bool = False
335     """ Get detailed information on all places that this place is a parent
336         for, i.e. all places for which it provides the address details.
337         Only POI places can have parents.
338     """
339     keywords: bool = False
340     """ Add information about the search terms used for this place.
341     """
342     geometry_simplification: float = 0.0
343     """ Simplification factor for a geometry in degrees WGS. A factor of
344         0.0 means the original geometry is kept. The higher the value, the
345         more the geometry gets simplified.
346     """
347
348     @classmethod
349     def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
350         """ Load the data fields of the class from a dictionary.
351             Unknown entries in the dictionary are ignored, missing ones
352             get the default setting.
353
354             The function supports type checking and throws a UsageError
355             when the value does not fit.
356         """
357         def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
358             if v is None:
359                 return field.default_factory() \
360                        if field.default_factory != dataclasses.MISSING \
361                        else field.default
362             if field.metadata and 'transform' in field.metadata:
363                 return field.metadata['transform'](v)
364             if not isinstance(v, field.type):
365                 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
366             return v
367
368         return cls(**{f.name: _check_field(kwargs[f.name], f)
369                       for f in dataclasses.fields(cls) if f.name in kwargs})
370
371
372 @dataclasses.dataclass
373 class ReverseDetails(LookupDetails):
374     """ Collection of parameters for the reverse call.
375     """
376     max_rank: int = dataclasses.field(default=30,
377                                       metadata={'transform': lambda v: max(0, min(v, 30))}
378                                      )
379     """ Highest address rank to return.
380     """
381     layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
382     """ Filter which kind of data to include.
383     """
384
385 @dataclasses.dataclass
386 class SearchDetails(LookupDetails):
387     """ Collection of parameters for the search call.
388     """
389     max_results: int = 10
390     """ Maximum number of results to be returned. The actual number of results
391         may be less.
392     """
393     min_rank: int = dataclasses.field(default=0,
394                                       metadata={'transform': lambda v: max(0, min(v, 30))}
395                                      )
396     """ Lowest address rank to return.
397     """
398     max_rank: int = dataclasses.field(default=30,
399                                       metadata={'transform': lambda v: max(0, min(v, 30))}
400                                      )
401     """ Highest address rank to return.
402     """
403     layers: Optional[DataLayer] = dataclasses.field(default=None,
404                                                     metadata={'transform': lambda r : r})
405     """ Filter which kind of data to include. When 'None' (the default) then
406         filtering by layers is disabled.
407     """
408     countries: List[str] = dataclasses.field(default_factory=list,
409                                              metadata={'transform': format_country})
410     """ Restrict search results to the given countries. An empty list (the
411         default) will disable this filter.
412     """
413     excluded: List[int] = dataclasses.field(default_factory=list,
414                                             metadata={'transform': format_excluded})
415     """ List of OSM objects to exclude from the results. Currenlty only
416         works when the internal place ID is given.
417         An empty list (the default) will disable this filter.
418     """
419     viewbox: Optional[Bbox] = dataclasses.field(default=None,
420                                                 metadata={'transform': Bbox.from_param})
421     """ Focus the search on a given map area.
422     """
423     bounded_viewbox: bool = False
424     """ Use 'viewbox' as a filter and restrict results to places within the
425         given area.
426     """
427     near: Optional[Point] = dataclasses.field(default=None,
428                                               metadata={'transform': Point.from_param})
429     """ Order results by distance to the given point.
430     """
431     near_radius: Optional[float] = dataclasses.field(default=None,
432                                               metadata={'transform': lambda r : r})
433     """ Use near point as a filter and drop results outside the given
434         radius. Radius is given in degrees WSG84.
435     """
436     categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
437                                                           metadata={'transform': format_categories})
438     """ Restrict search to places with one of the given class/type categories.
439         An empty list (the default) will disable this filter.
440     """
441
442     def __post_init__(self) -> None:
443         if self.viewbox is not None:
444             xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
445             yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
446             self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
447                                    self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
448
449
450     def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
451         """ Change the min_rank and max_rank fields to respect the
452             given boundaries.
453         """
454         assert new_min <= new_max
455         self.min_rank = max(self.min_rank, new_min)
456         self.max_rank = min(self.max_rank, new_max)
457
458
459     def is_impossible(self) -> bool:
460         """ Check if the parameter configuration is contradictionary and
461             cannot yield any results.
462         """
463         return (self.min_rank > self.max_rank
464                 or (self.bounded_viewbox
465                     and self.viewbox is not None and self.near is not None
466                     and self.viewbox.contains(self.near))
467                 or self.layers is not None and not self.layers)
468
469
470     def layer_enabled(self, layer: DataLayer) -> bool:
471         """ Check if the given layer has been choosen. Also returns
472             true when layer restriction has been disabled completely.
473         """
474         return self.layers is None or bool(self.layers & layer)