]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/v1/helpers.py
check-database on frozen db shouldnt recommend indexing
[nominatim.git] / nominatim / api / v1 / helpers.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Helper function for parsing parameters and and outputting data
9 specifically for the v1 version of the API.
10 """
11 from typing import Tuple, Optional, Any, Dict, Iterable
12 from itertools import chain
13 import re
14
15 from nominatim.api.results import SearchResult, SearchResults, SourceTable
16 from nominatim.api.types import SearchDetails, GeometryFormat
17
18 REVERSE_MAX_RANKS = [2, 2, 2,   # 0-2   Continent/Sea
19                      4, 4,      # 3-4   Country
20                      8,         # 5     State
21                      10, 10,    # 6-7   Region
22                      12, 12,    # 8-9   County
23                      16, 17,    # 10-11 City
24                      18,        # 12    Town
25                      19,        # 13    Village/Suburb
26                      22,        # 14    Hamlet/Neighbourhood
27                      25,        # 15    Localities
28                      26,        # 16    Major Streets
29                      27,        # 17    Minor Streets
30                      30         # 18    Building
31                     ]
32
33
34 def zoom_to_rank(zoom: int) -> int:
35     """ Convert a zoom parameter into a rank according to the v1 API spec.
36     """
37     return REVERSE_MAX_RANKS[max(0, min(18, zoom))]
38
39
40 FEATURE_TYPE_TO_RANK: Dict[Optional[str], Any] = {
41     'country': (4, 4),
42     'state': (8, 8),
43     'city': (14, 16),
44     'settlement': (8, 20)
45 }
46
47
48 def feature_type_to_rank(feature_type: Optional[str]) -> Tuple[int, int]:
49     """ Convert a feature type parameter to a tuple of
50         feature type name, minimum rank and maximum rank.
51     """
52     return FEATURE_TYPE_TO_RANK.get(feature_type, (0, 30))
53
54
55 #pylint: disable=too-many-arguments,too-many-branches
56 def extend_query_parts(queryparts: Dict[str, Any], details: Dict[str, Any],
57                        feature_type: Optional[str],
58                        namedetails: bool, extratags: bool,
59                        excluded: Iterable[str]) -> None:
60     """ Add parameters from details dictionary to the query parts
61         dictionary which is suitable as URL parameter dictionary.
62     """
63     parsed = SearchDetails.from_kwargs(details)
64     if parsed.geometry_output != GeometryFormat.NONE:
65         if parsed.geometry_output & GeometryFormat.GEOJSON:
66             queryparts['polygon_geojson'] = '1'
67         if parsed.geometry_output & GeometryFormat.KML:
68             queryparts['polygon_kml'] = '1'
69         if parsed.geometry_output & GeometryFormat.SVG:
70             queryparts['polygon_svg'] = '1'
71         if parsed.geometry_output & GeometryFormat.TEXT:
72             queryparts['polygon_text'] = '1'
73     if parsed.address_details:
74         queryparts['addressdetails'] = '1'
75     if namedetails:
76         queryparts['namedetails'] = '1'
77     if extratags:
78         queryparts['extratags'] = '1'
79     if parsed.geometry_simplification > 0.0:
80         queryparts['polygon_threshold'] = f"{parsed.geometry_simplification:.6g}"
81     if parsed.max_results != 10:
82         queryparts['limit'] = str(parsed.max_results)
83     if parsed.countries:
84         queryparts['countrycodes'] = ','.join(parsed.countries)
85     queryparts['exclude_place_ids'] = \
86         ','.join(chain(excluded, map(str, parsed.excluded)))
87     if parsed.viewbox:
88         queryparts['viewbox'] = ','.join(f"{c:.7g}" for c in parsed.viewbox.coords)
89     if parsed.bounded_viewbox:
90         queryparts['bounded'] = '1'
91     if not details['dedupe']:
92         queryparts['dedupe'] = '0'
93     if feature_type in FEATURE_TYPE_TO_RANK:
94         queryparts['featureType'] = feature_type
95
96
97 def deduplicate_results(results: SearchResults, max_results: int) -> SearchResults:
98     """ Remove results that look like duplicates.
99
100         Two results are considered the same if they have the same OSM ID
101         or if they have the same category, display name and rank.
102     """
103     osm_ids_done = set()
104     classification_done = set()
105     deduped = SearchResults()
106     for result in results:
107         if result.source_table == SourceTable.POSTCODE:
108             assert result.names and 'ref' in result.names
109             if any(_is_postcode_relation_for(r, result.names['ref']) for r in results):
110                 continue
111         classification = (result.osm_object[0] if result.osm_object else None,
112                           result.category,
113                           result.display_name,
114                           result.rank_address)
115         if result.osm_object not in osm_ids_done \
116            and classification not in classification_done:
117             deduped.append(result)
118         osm_ids_done.add(result.osm_object)
119         classification_done.add(classification)
120         if len(deduped) >= max_results:
121             break
122
123     return deduped
124
125
126 def _is_postcode_relation_for(result: SearchResult, postcode: str) -> bool:
127     return result.source_table == SourceTable.PLACEX \
128            and result.osm_object is not None \
129            and result.osm_object[0] == 'R' \
130            and result.category == ('boundary', 'postal_code') \
131            and result.names is not None \
132            and result.names.get('ref') == postcode
133
134
135 def _deg(axis:str) -> str:
136     return f"(?P<{axis}_deg>\\d+\\.\\d+)°?"
137
138 def _deg_min(axis: str) -> str:
139     return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>[\\d.]+)?[′']*"
140
141 def _deg_min_sec(axis: str) -> str:
142     return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>\\d+)[′'\\s]+(?P<{axis}_sec>[\\d.]+)?[\"″]*"
143
144 COORD_REGEX = [re.compile(r'(?:(?P<pre>.*?)\s+)??' + r + r'(?:\s+(?P<post>.*))?') for r in (
145     r"(?P<ns>[NS])\s*" + _deg('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg('lon'),
146     _deg('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg('lon') + r"\s*(?P<ew>[EW])",
147     r"(?P<ns>[NS])\s*" + _deg_min('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min('lon'),
148     _deg_min('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min('lon') + r"\s*(?P<ew>[EW])",
149     r"(?P<ns>[NS])\s*" + _deg_min_sec('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min_sec('lon'),
150     _deg_min_sec('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min_sec('lon') + r"\s*(?P<ew>[EW])",
151     r"\[?(?P<lat_deg>[+-]?\d+\.\d+)[\s,]+(?P<lon_deg>[+-]?\d+\.\d+)\]?"
152 )]
153
154 def extract_coords_from_query(query: str) -> Tuple[str, Optional[float], Optional[float]]:
155     """ Look for something that is formated like a coordinate at the
156         beginning or end of the query. If found, extract the coordinate and
157         return the remaining query (or the empty string if the query
158         consisted of nothing but a coordinate).
159
160         Only the first match will be returned.
161     """
162     for regex in COORD_REGEX:
163         match = regex.fullmatch(query)
164         if match is None:
165             continue
166         groups = match.groupdict()
167         if not groups['pre'] or not groups['post']:
168             x = float(groups['lon_deg']) \
169                 + float(groups.get('lon_min', 0.0)) / 60.0 \
170                 + float(groups.get('lon_sec', 0.0)) / 3600.0
171             if groups.get('ew') == 'W':
172                 x = -x
173             y = float(groups['lat_deg']) \
174                 + float(groups.get('lat_min', 0.0)) / 60.0 \
175                 + float(groups.get('lat_sec', 0.0)) / 3600.0
176             if groups.get('ns') == 'S':
177                 y = -y
178             return groups['pre'] or groups['post'] or '', x, y
179
180     return query, None, None
181
182
183 CATEGORY_REGEX = re.compile(r'(?P<pre>.*?)\[(?P<cls>[a-zA-Z_]+)=(?P<typ>[a-zA-Z_]+)\](?P<post>.*)')
184
185 def extract_category_from_query(query: str) -> Tuple[str, Optional[str], Optional[str]]:
186     """ Extract a hidden category specification of the form '[key=value]' from
187         the query. If found, extract key and value  and
188         return the remaining query (or the empty string if the query
189         consisted of nothing but a category).
190
191         Only the first match will be returned.
192     """
193     match = CATEGORY_REGEX.search(query)
194     if match is not None:
195         return (match.group('pre').strip() + ' ' + match.group('post').strip()).strip(), \
196                match.group('cls'), match.group('typ')
197
198     return query, None, None