X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/d2c56f9f96bce00d9d9309ddf230f825916eccae..1b50381852850807a688270312952d74a309f09c:/nominatim/api/v1/helpers.py diff --git a/nominatim/api/v1/helpers.py b/nominatim/api/v1/helpers.py index e16a22a3..62e5e943 100644 --- a/nominatim/api/v1/helpers.py +++ b/nominatim/api/v1/helpers.py @@ -8,6 +8,12 @@ Helper function for parsing parameters and and outputting data specifically for the v1 version of the API. """ +from typing import Tuple, Optional, Any, Dict, Iterable +from itertools import chain +import re + +from nominatim.api.results import SearchResult, SearchResults, SourceTable +from nominatim.api.types import SearchDetails, GeometryFormat REVERSE_MAX_RANKS = [2, 2, 2, # 0-2 Continent/Sea 4, 4, # 3-4 Country @@ -29,3 +35,164 @@ def zoom_to_rank(zoom: int) -> int: """ Convert a zoom parameter into a rank according to the v1 API spec. """ return REVERSE_MAX_RANKS[max(0, min(18, zoom))] + + +FEATURE_TYPE_TO_RANK: Dict[Optional[str], Any] = { + 'country': (4, 4), + 'state': (8, 8), + 'city': (14, 16), + 'settlement': (8, 20) +} + + +def feature_type_to_rank(feature_type: Optional[str]) -> Tuple[int, int]: + """ Convert a feature type parameter to a tuple of + feature type name, minimum rank and maximum rank. + """ + return FEATURE_TYPE_TO_RANK.get(feature_type, (0, 30)) + + +#pylint: disable=too-many-arguments,too-many-branches +def extend_query_parts(queryparts: Dict[str, Any], details: Dict[str, Any], + feature_type: Optional[str], + namedetails: bool, extratags: bool, + excluded: Iterable[str]) -> None: + """ Add parameters from details dictionary to the query parts + dictionary which is suitable as URL parameter dictionary. + """ + parsed = SearchDetails.from_kwargs(details) + if parsed.geometry_output != GeometryFormat.NONE: + if parsed.geometry_output & GeometryFormat.GEOJSON: + queryparts['polygon_geojson'] = '1' + if parsed.geometry_output & GeometryFormat.KML: + queryparts['polygon_kml'] = '1' + if parsed.geometry_output & GeometryFormat.SVG: + queryparts['polygon_svg'] = '1' + if parsed.geometry_output & GeometryFormat.TEXT: + queryparts['polygon_text'] = '1' + if parsed.address_details: + queryparts['addressdetails'] = '1' + if namedetails: + queryparts['namedetails'] = '1' + if extratags: + queryparts['extratags'] = '1' + if parsed.geometry_simplification > 0.0: + queryparts['polygon_threshold'] = f"{parsed.geometry_simplification:.6g}" + if parsed.max_results != 10: + queryparts['limit'] = str(parsed.max_results) + if parsed.countries: + queryparts['countrycodes'] = ','.join(parsed.countries) + queryparts['exclude_place_ids'] = \ + ','.join(chain(excluded, map(str, parsed.excluded))) + if parsed.viewbox: + queryparts['viewbox'] = ','.join(f"{c:.7g}" for c in parsed.viewbox.coords) + if parsed.bounded_viewbox: + queryparts['bounded'] = '1' + if not details['dedupe']: + queryparts['dedupe'] = '0' + if feature_type in FEATURE_TYPE_TO_RANK: + queryparts['featureType'] = feature_type + + +def deduplicate_results(results: SearchResults, max_results: int) -> SearchResults: + """ Remove results that look like duplicates. + + Two results are considered the same if they have the same OSM ID + or if they have the same category, display name and rank. + """ + osm_ids_done = set() + classification_done = set() + deduped = SearchResults() + for result in results: + if result.source_table == SourceTable.POSTCODE: + assert result.names and 'ref' in result.names + if any(_is_postcode_relation_for(r, result.names['ref']) for r in results): + continue + classification = (result.osm_object[0] if result.osm_object else None, + result.category, + result.display_name, + result.rank_address) + if result.osm_object not in osm_ids_done \ + and classification not in classification_done: + deduped.append(result) + osm_ids_done.add(result.osm_object) + classification_done.add(classification) + if len(deduped) >= max_results: + break + + return deduped + + +def _is_postcode_relation_for(result: SearchResult, postcode: str) -> bool: + return result.source_table == SourceTable.PLACEX \ + and result.osm_object is not None \ + and result.osm_object[0] == 'R' \ + and result.category == ('boundary', 'postal_code') \ + and result.names is not None \ + and result.names.get('ref') == postcode + + +def _deg(axis:str) -> str: + return f"(?P<{axis}_deg>\\d+\\.\\d+)°?" + +def _deg_min(axis: str) -> str: + return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>[\\d.]+)?[′']*" + +def _deg_min_sec(axis: str) -> str: + return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>\\d+)[′'\\s]+(?P<{axis}_sec>[\\d.]+)?[\"″]*" + +COORD_REGEX = [re.compile(r'(?:(?P
.*?)\s+)??' + r + r'(?:\s+(?P.*))?') for r in (
+    r"(?P[NS])\s*" + _deg('lat') + r"[\s,]+" + r"(?P[EW])\s*" + _deg('lon'),
+    _deg('lat') + r"\s*(?P[NS])[\s,]+" + _deg('lon') + r"\s*(?P[EW])",
+    r"(?P[NS])\s*" + _deg_min('lat') + r"[\s,]+" + r"(?P[EW])\s*" + _deg_min('lon'),
+    _deg_min('lat') + r"\s*(?P[NS])[\s,]+" + _deg_min('lon') + r"\s*(?P[EW])",
+    r"(?P[NS])\s*" + _deg_min_sec('lat') + r"[\s,]+" + r"(?P[EW])\s*" + _deg_min_sec('lon'),
+    _deg_min_sec('lat') + r"\s*(?P[NS])[\s,]+" + _deg_min_sec('lon') + r"\s*(?P[EW])",
+    r"\[?(?P[+-]?\d+\.\d+)[\s,]+(?P[+-]?\d+\.\d+)\]?"
+)]
+
+def extract_coords_from_query(query: str) -> Tuple[str, Optional[float], Optional[float]]:
+    """ Look for something that is formated like a coordinate at the
+        beginning or end of the query. If found, extract the coordinate and
+        return the remaining query (or the empty string if the query
+        consisted of nothing but a coordinate).
+
+        Only the first match will be returned.
+    """
+    for regex in COORD_REGEX:
+        match = regex.fullmatch(query)
+        if match is None:
+            continue
+        groups = match.groupdict()
+        if not groups['pre'] or not groups['post']:
+            x = float(groups['lon_deg']) \
+                + float(groups.get('lon_min', 0.0)) / 60.0 \
+                + float(groups.get('lon_sec', 0.0)) / 3600.0
+            if groups.get('ew') == 'W':
+                x = -x
+            y = float(groups['lat_deg']) \
+                + float(groups.get('lat_min', 0.0)) / 60.0 \
+                + float(groups.get('lat_sec', 0.0)) / 3600.0
+            if groups.get('ns') == 'S':
+                y = -y
+            return groups['pre'] or groups['post'] or '', x, y
+
+    return query, None, None
+
+
+CATEGORY_REGEX = re.compile(r'(?P
.*?)\[(?P[a-zA-Z_]+)=(?P[a-zA-Z_]+)\](?P.*)')
+
+def extract_category_from_query(query: str) -> Tuple[str, Optional[str], Optional[str]]:
+    """ Extract a hidden category specification of the form '[key=value]' from
+        the query. If found, extract key and value  and
+        return the remaining query (or the empty string if the query
+        consisted of nothing but a category).
+
+        Only the first match will be returned.
+    """
+    match = CATEGORY_REGEX.search(query)
+    if match is not None:
+        return (match.group('pre').strip() + ' ' + match.group('post').strip()).strip(), \
+               match.group('cls'), match.group('typ')
+
+    return query, None, None