]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_api/v1/helpers.py
move server route creation into async function
[nominatim.git] / src / nominatim_api / v1 / helpers.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Helper function for parsing parameters and and outputting data
9 specifically for the v1 version of the API.
10 """
11 from typing import Tuple, Optional, Any, Dict, Iterable
12 from itertools import chain
13 import re
14
15 from ..results import SearchResult, SearchResults, SourceTable
16 from ..types import SearchDetails, GeometryFormat
17
18
19 REVERSE_MAX_RANKS = [2, 2, 2,   # 0-2   Continent/Sea
20                      4, 4,      # 3-4   Country
21                      8,         # 5     State
22                      10, 10,    # 6-7   Region
23                      12, 12,    # 8-9   County
24                      16, 17,    # 10-11 City
25                      18,        # 12    Town
26                      19,        # 13    Village/Suburb
27                      22,        # 14    Hamlet/Neighbourhood
28                      25,        # 15    Localities
29                      26,        # 16    Major Streets
30                      27,        # 17    Minor Streets
31                      30         # 18    Building
32                      ]
33
34
35 def zoom_to_rank(zoom: int) -> int:
36     """ Convert a zoom parameter into a rank according to the v1 API spec.
37     """
38     return REVERSE_MAX_RANKS[max(0, min(18, zoom))]
39
40
41 FEATURE_TYPE_TO_RANK: Dict[Optional[str], Tuple[int, int]] = {
42     'country': (4, 4),
43     'state': (8, 8),
44     'city': (14, 16),
45     'settlement': (8, 20)
46 }
47
48
49 def feature_type_to_rank(feature_type: Optional[str]) -> Tuple[int, int]:
50     """ Convert a feature type parameter to a tuple of
51         feature type name, minimum rank and maximum rank.
52     """
53     return FEATURE_TYPE_TO_RANK.get(feature_type, (0, 30))
54
55
56 def extend_query_parts(queryparts: Dict[str, Any], details: Dict[str, Any],
57                        feature_type: Optional[str],
58                        namedetails: bool, extratags: bool,
59                        excluded: Iterable[str]) -> None:
60     """ Add parameters from details dictionary to the query parts
61         dictionary which is suitable as URL parameter dictionary.
62     """
63     parsed = SearchDetails.from_kwargs(details)
64     if parsed.geometry_output != GeometryFormat.NONE:
65         if GeometryFormat.GEOJSON & parsed.geometry_output:
66             queryparts['polygon_geojson'] = '1'
67         if GeometryFormat.KML & parsed.geometry_output:
68             queryparts['polygon_kml'] = '1'
69         if GeometryFormat.SVG & parsed.geometry_output:
70             queryparts['polygon_svg'] = '1'
71         if GeometryFormat.TEXT & parsed.geometry_output:
72             queryparts['polygon_text'] = '1'
73     if parsed.address_details:
74         queryparts['addressdetails'] = '1'
75     if namedetails:
76         queryparts['namedetails'] = '1'
77     if extratags:
78         queryparts['extratags'] = '1'
79     if parsed.geometry_simplification > 0.0:
80         queryparts['polygon_threshold'] = f"{parsed.geometry_simplification:.6g}"
81     if parsed.max_results != 10:
82         queryparts['limit'] = str(parsed.max_results)
83     if parsed.countries:
84         queryparts['countrycodes'] = ','.join(parsed.countries)
85     queryparts['exclude_place_ids'] = \
86         ','.join(chain(excluded, map(str, (e for e in parsed.excluded if e > 0))))
87     if parsed.viewbox:
88         queryparts['viewbox'] = ','.join(f"{c:.7g}" for c in parsed.viewbox.coords)
89     if parsed.bounded_viewbox:
90         queryparts['bounded'] = '1'
91     if not details['dedupe']:
92         queryparts['dedupe'] = '0'
93     if feature_type in FEATURE_TYPE_TO_RANK:
94         queryparts['featureType'] = feature_type
95
96
97 def deduplicate_results(results: SearchResults, max_results: int) -> SearchResults:
98     """ Remove results that look like duplicates.
99
100         Two results are considered the same if they have the same OSM ID
101         or if they have the same category, display name and rank.
102     """
103     osm_ids_done = set()
104     classification_done = set()
105     deduped = SearchResults()
106     for result in results:
107         if result.source_table == SourceTable.POSTCODE:
108             assert result.names and 'ref' in result.names
109             if any(_is_postcode_relation_for(r, result.names['ref']) for r in results):
110                 continue
111         if result.source_table == SourceTable.PLACEX:
112             classification = (result.osm_object[0] if result.osm_object else None,
113                               result.category,
114                               result.display_name,
115                               result.rank_address)
116             if result.osm_object not in osm_ids_done \
117                and classification not in classification_done:
118                 deduped.append(result)
119             osm_ids_done.add(result.osm_object)
120             classification_done.add(classification)
121         else:
122             deduped.append(result)
123         if len(deduped) >= max_results:
124             break
125
126     return deduped
127
128
129 def _is_postcode_relation_for(result: SearchResult, postcode: str) -> bool:
130     return result.source_table == SourceTable.PLACEX \
131            and result.osm_object is not None \
132            and result.osm_object[0] == 'R' \
133            and result.category == ('boundary', 'postal_code') \
134            and result.names is not None \
135            and result.names.get('ref') == postcode
136
137
138 def _deg(axis: str) -> str:
139     return f"(?P<{axis}_deg>\\d+\\.\\d+)°?"
140
141
142 def _deg_min(axis: str) -> str:
143     return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>[\\d.]+)[′']*"
144
145
146 def _deg_min_sec(axis: str) -> str:
147     return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>\\d+)[′'\\s]+(?P<{axis}_sec>[\\d.]+)[\"″]*"
148
149
150 COORD_REGEX = [re.compile(r'(?:(?P<pre>.*?)\s+)??' + r + r'(?:\s+(?P<post>.*))?') for r in (
151     r"(?P<ns>[NS])\s*" + _deg('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg('lon'),
152     _deg('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg('lon') + r"\s*(?P<ew>[EW])",
153     r"(?P<ns>[NS])\s*" + _deg_min('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min('lon'),
154     _deg_min('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min('lon') + r"\s*(?P<ew>[EW])",
155     r"(?P<ns>[NS])\s*" + _deg_min_sec('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min_sec('lon'),
156     _deg_min_sec('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min_sec('lon') + r"\s*(?P<ew>[EW])",
157     r"\[?(?P<lat_deg>[+-]?\d+\.\d+)[\s,]+(?P<lon_deg>[+-]?\d+\.\d+)\]?"
158 )]
159
160
161 def extract_coords_from_query(query: str) -> Tuple[str, Optional[float], Optional[float]]:
162     """ Look for something that is formatted like a coordinate at the
163         beginning or end of the query. If found, extract the coordinate and
164         return the remaining query (or the empty string if the query
165         consisted of nothing but a coordinate).
166
167         Only the first match will be returned.
168     """
169     for regex in COORD_REGEX:
170         match = regex.fullmatch(query)
171         if match is None:
172             continue
173         groups = match.groupdict()
174         if not groups['pre'] or not groups['post']:
175             x = float(groups['lon_deg']) \
176                 + float(groups.get('lon_min', 0.0)) / 60.0 \
177                 + float(groups.get('lon_sec', 0.0)) / 3600.0
178             if groups.get('ew') == 'W':
179                 x = -x
180             y = float(groups['lat_deg']) \
181                 + float(groups.get('lat_min', 0.0)) / 60.0 \
182                 + float(groups.get('lat_sec', 0.0)) / 3600.0
183             if groups.get('ns') == 'S':
184                 y = -y
185             return groups['pre'] or groups['post'] or '', x, y
186
187     return query, None, None
188
189
190 CATEGORY_REGEX = re.compile(r'(?P<pre>.*?)\[(?P<cls>[a-zA-Z_]+)=(?P<typ>[a-zA-Z_]+)\](?P<post>.*)')
191
192
193 def extract_category_from_query(query: str) -> Tuple[str, Optional[str], Optional[str]]:
194     """ Extract a hidden category specification of the form '[key=value]' from
195         the query. If found, extract key and value  and
196         return the remaining query (or the empty string if the query
197         consisted of nothing but a category).
198
199         Only the first match will be returned.
200     """
201     match = CATEGORY_REGEX.search(query)
202     if match is not None:
203         return (match.group('pre').strip() + ' ' + match.group('post').strip()).strip(), \
204                match.group('cls'), match.group('typ')
205
206     return query, None, None