]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/v1/helpers.py
fix various failing BDD tests
[nominatim.git] / nominatim / api / v1 / helpers.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Helper function for parsing parameters and and outputting data
9 specifically for the v1 version of the API.
10 """
11 from typing import Tuple, Optional, Any, Dict, Iterable
12 from itertools import chain
13 import re
14
15 from nominatim.api.results import SearchResult, SearchResults, SourceTable
16 from nominatim.api.types import SearchDetails, GeometryFormat
17
18 REVERSE_MAX_RANKS = [2, 2, 2,   # 0-2   Continent/Sea
19                      4, 4,      # 3-4   Country
20                      8,         # 5     State
21                      10, 10,    # 6-7   Region
22                      12, 12,    # 8-9   County
23                      16, 17,    # 10-11 City
24                      18,        # 12    Town
25                      19,        # 13    Village/Suburb
26                      22,        # 14    Hamlet/Neighbourhood
27                      25,        # 15    Localities
28                      26,        # 16    Major Streets
29                      27,        # 17    Minor Streets
30                      30         # 18    Building
31                     ]
32
33
34 def zoom_to_rank(zoom: int) -> int:
35     """ Convert a zoom parameter into a rank according to the v1 API spec.
36     """
37     return REVERSE_MAX_RANKS[max(0, min(18, zoom))]
38
39
40 FEATURE_TYPE_TO_RANK: Dict[Optional[str], Any] = {
41     'country': (4, 4),
42     'state': (8, 8),
43     'city': (14, 16),
44     'settlement': (8, 20)
45 }
46
47
48 def feature_type_to_rank(feature_type: Optional[str]) -> Tuple[int, int]:
49     """ Convert a feature type parameter to a tuple of
50         feature type name, minimum rank and maximum rank.
51     """
52     return FEATURE_TYPE_TO_RANK.get(feature_type, (0, 30))
53
54
55 #pylint: disable=too-many-arguments
56 def extend_query_parts(queryparts: Dict[str, Any], details: Dict[str, Any],
57                        feature_type: Optional[str],
58                        namedetails: bool, extratags: bool,
59                        excluded: Iterable[str]) -> None:
60     """ Add parameters from details dictionary to the query parts
61         dictionary which is suitable as URL parameter dictionary.
62     """
63     parsed = SearchDetails.from_kwargs(details)
64     if parsed.geometry_output != GeometryFormat.NONE:
65         for flag in parsed.geometry_output:
66             assert flag.name
67             queryparts[f'polygon_{flag.name.lower()}'] = '1'
68     if parsed.address_details:
69         queryparts['addressdetails'] = '1'
70     if namedetails:
71         queryparts['namedetails'] = '1'
72     if extratags:
73         queryparts['extratags'] = '1'
74     if parsed.geometry_simplification > 0.0:
75         queryparts['polygon_threshold'] = f"{parsed.geometry_simplification:.6g}"
76     if parsed.max_results != 10:
77         queryparts['limit'] = str(parsed.max_results)
78     if parsed.countries:
79         queryparts['countrycodes'] = ','.join(parsed.countries)
80     queryparts['exclude_place_ids'] = \
81         ','.join(chain(excluded, map(str, parsed.excluded)))
82     if parsed.viewbox:
83         queryparts['viewbox'] = ','.join(f"{c:.7g}" for c in parsed.viewbox.coords)
84     if parsed.bounded_viewbox:
85         queryparts['bounded'] = '1'
86     if not details['dedupe']:
87         queryparts['dedupe'] = '0'
88     if feature_type in FEATURE_TYPE_TO_RANK:
89         queryparts['featureType'] = feature_type
90
91
92 def deduplicate_results(results: SearchResults, max_results: int) -> SearchResults:
93     """ Remove results that look like duplicates.
94
95         Two results are considered the same if they have the same OSM ID
96         or if they have the same category, display name and rank.
97     """
98     osm_ids_done = set()
99     classification_done = set()
100     deduped = SearchResults()
101     for result in results:
102         if result.source_table == SourceTable.POSTCODE:
103             assert result.names and 'ref' in result.names
104             if any(_is_postcode_relation_for(r, result.names['ref']) for r in results):
105                 continue
106         classification = (result.osm_object[0] if result.osm_object else None,
107                           result.category,
108                           result.display_name,
109                           result.rank_address)
110         if result.osm_object not in osm_ids_done \
111            and classification not in classification_done:
112             deduped.append(result)
113         osm_ids_done.add(result.osm_object)
114         classification_done.add(classification)
115         if len(deduped) >= max_results:
116             break
117
118     return deduped
119
120
121 def _is_postcode_relation_for(result: SearchResult, postcode: str) -> bool:
122     return result.source_table == SourceTable.PLACEX \
123            and result.osm_object is not None \
124            and result.osm_object[0] == 'R' \
125            and result.category == ('boundary', 'postal_code') \
126            and result.names is not None \
127            and result.names.get('ref') == postcode
128
129
130 def _deg(axis:str) -> str:
131     return f"(?P<{axis}_deg>\\d+\\.\\d+)°?"
132
133 def _deg_min(axis: str) -> str:
134     return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>[\\d.]+)?[′']*"
135
136 def _deg_min_sec(axis: str) -> str:
137     return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>\\d+)[′'\\s]+(?P<{axis}_sec>[\\d.]+)?[\"″]*"
138
139 COORD_REGEX = [re.compile(r'(?:(?P<pre>.*?)\s+)??' + r + r'(?:\s+(?P<post>.*))?') for r in (
140     r"(?P<ns>[NS])\s*" + _deg('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg('lon'),
141     _deg('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg('lon') + r"\s*(?P<ew>[EW])",
142     r"(?P<ns>[NS])\s*" + _deg_min('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min('lon'),
143     _deg_min('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min('lon') + r"\s*(?P<ew>[EW])",
144     r"(?P<ns>[NS])\s*" + _deg_min_sec('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min_sec('lon'),
145     _deg_min_sec('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min_sec('lon') + r"\s*(?P<ew>[EW])",
146     r"\[?(?P<lat_deg>[+-]?\d+\.\d+)[\s,]+(?P<lon_deg>[+-]?\d+\.\d+)\]?"
147 )]
148
149 def extract_coords_from_query(query: str) -> Tuple[str, Optional[float], Optional[float]]:
150     """ Look for something that is formated like a coordinate at the
151         beginning or end of the query. If found, extract the coordinate and
152         return the remaining query (or the empty string if the query
153         consisted of nothing but a coordinate).
154
155         Only the first match will be returned.
156     """
157     for regex in COORD_REGEX:
158         match = regex.fullmatch(query)
159         if match is None:
160             continue
161         groups = match.groupdict()
162         if not groups['pre'] or not groups['post']:
163             x = float(groups['lon_deg']) \
164                 + float(groups.get('lon_min', 0.0)) / 60.0 \
165                 + float(groups.get('lon_sec', 0.0)) / 3600.0
166             if groups.get('ew') == 'W':
167                 x = -x
168             y = float(groups['lat_deg']) \
169                 + float(groups.get('lat_min', 0.0)) / 60.0 \
170                 + float(groups.get('lat_sec', 0.0)) / 3600.0
171             if groups.get('ns') == 'S':
172                 y = -y
173             return groups['pre'] or groups['post'] or '', x, y
174
175     return query, None, None
176
177
178 CATEGORY_REGEX = re.compile(r'(?P<pre>.*?)\[(?P<cls>[a-zA-Z_]+)=(?P<typ>[a-zA-Z_]+)\](?P<post>.*)')
179
180 def extract_category_from_query(query: str) -> Tuple[str, Optional[str], Optional[str]]:
181     """ Extract a hidden category specification of the form '[key=value]' from
182         the query. If found, extract key and value  and
183         return the remaining query (or the empty string if the query
184         consisted of nothing but a category).
185
186         Only the first match will be returned.
187     """
188     match = CATEGORY_REGEX.search(query)
189     if match is not None:
190         return (match.group('pre').strip() + ' ' + match.group('post').strip()).strip(), \
191                match.group('cls'), match.group('typ')
192
193     return query, None, None