]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/api/v1/helpers.py
Merge pull request #3413 from osm-search/mtmail-patch-1
[nominatim.git] / nominatim / api / v1 / helpers.py
index c92592dec8b25364cb361e4673cebc234e51260c..ffd06a6a35e3439d4b47ae92a4bcd4d321506521 100644 (file)
@@ -8,8 +8,12 @@
 Helper function for parsing parameters and and outputting data
 specifically for the v1 version of the API.
 """
 Helper function for parsing parameters and and outputting data
 specifically for the v1 version of the API.
 """
+from typing import Tuple, Optional, Any, Dict, Iterable
+from itertools import chain
+import re
 
 from nominatim.api.results import SearchResult, SearchResults, SourceTable
 
 from nominatim.api.results import SearchResult, SearchResults, SourceTable
+from nominatim.api.types import SearchDetails, GeometryFormat
 
 REVERSE_MAX_RANKS = [2, 2, 2,   # 0-2   Continent/Sea
                      4, 4,      # 3-4   Country
 
 REVERSE_MAX_RANKS = [2, 2, 2,   # 0-2   Continent/Sea
                      4, 4,      # 3-4   Country
@@ -33,6 +37,63 @@ def zoom_to_rank(zoom: int) -> int:
     return REVERSE_MAX_RANKS[max(0, min(18, zoom))]
 
 
     return REVERSE_MAX_RANKS[max(0, min(18, zoom))]
 
 
+FEATURE_TYPE_TO_RANK: Dict[Optional[str], Tuple[int, int]] = {
+    'country': (4, 4),
+    'state': (8, 8),
+    'city': (14, 16),
+    'settlement': (8, 20)
+}
+
+
+def feature_type_to_rank(feature_type: Optional[str]) -> Tuple[int, int]:
+    """ Convert a feature type parameter to a tuple of
+        feature type name, minimum rank and maximum rank.
+    """
+    return FEATURE_TYPE_TO_RANK.get(feature_type, (0, 30))
+
+
+#pylint: disable=too-many-arguments,too-many-branches
+def extend_query_parts(queryparts: Dict[str, Any], details: Dict[str, Any],
+                       feature_type: Optional[str],
+                       namedetails: bool, extratags: bool,
+                       excluded: Iterable[str]) -> None:
+    """ Add parameters from details dictionary to the query parts
+        dictionary which is suitable as URL parameter dictionary.
+    """
+    parsed = SearchDetails.from_kwargs(details)
+    if parsed.geometry_output != GeometryFormat.NONE:
+        if GeometryFormat.GEOJSON in parsed.geometry_output:
+            queryparts['polygon_geojson'] = '1'
+        if GeometryFormat.KML in parsed.geometry_output:
+            queryparts['polygon_kml'] = '1'
+        if GeometryFormat.SVG in parsed.geometry_output:
+            queryparts['polygon_svg'] = '1'
+        if GeometryFormat.TEXT in parsed.geometry_output:
+            queryparts['polygon_text'] = '1'
+    if parsed.address_details:
+        queryparts['addressdetails'] = '1'
+    if namedetails:
+        queryparts['namedetails'] = '1'
+    if extratags:
+        queryparts['extratags'] = '1'
+    if parsed.geometry_simplification > 0.0:
+        queryparts['polygon_threshold'] = f"{parsed.geometry_simplification:.6g}"
+    if parsed.max_results != 10:
+        queryparts['limit'] = str(parsed.max_results)
+    if parsed.countries:
+        queryparts['countrycodes'] = ','.join(parsed.countries)
+    queryparts['exclude_place_ids'] = \
+        ','.join(chain(excluded, map(str, (e for e in parsed.excluded if e > 0))))
+    if parsed.viewbox:
+        queryparts['viewbox'] = ','.join(f"{c:.7g}" for c in parsed.viewbox.coords)
+    if parsed.bounded_viewbox:
+        queryparts['bounded'] = '1'
+    if not details['dedupe']:
+        queryparts['dedupe'] = '0'
+    if feature_type in FEATURE_TYPE_TO_RANK:
+        queryparts['featureType'] = feature_type
+
+
 def deduplicate_results(results: SearchResults, max_results: int) -> SearchResults:
     """ Remove results that look like duplicates.
 
 def deduplicate_results(results: SearchResults, max_results: int) -> SearchResults:
     """ Remove results that look like duplicates.
 
@@ -47,15 +108,18 @@ def deduplicate_results(results: SearchResults, max_results: int) -> SearchResul
             assert result.names and 'ref' in result.names
             if any(_is_postcode_relation_for(r, result.names['ref']) for r in results):
                 continue
             assert result.names and 'ref' in result.names
             if any(_is_postcode_relation_for(r, result.names['ref']) for r in results):
                 continue
-        classification = (result.osm_object[0] if result.osm_object else None,
-                          result.category,
-                          result.display_name,
-                          result.rank_address)
-        if result.osm_object not in osm_ids_done \
-           and classification not in classification_done:
+        if result.source_table == SourceTable.PLACEX:
+            classification = (result.osm_object[0] if result.osm_object else None,
+                              result.category,
+                              result.display_name,
+                              result.rank_address)
+            if result.osm_object not in osm_ids_done \
+               and classification not in classification_done:
+                deduped.append(result)
+            osm_ids_done.add(result.osm_object)
+            classification_done.add(classification)
+        else:
             deduped.append(result)
             deduped.append(result)
-        osm_ids_done.add(result.osm_object)
-        classification_done.add(classification)
         if len(deduped) >= max_results:
             break
 
         if len(deduped) >= max_results:
             break
 
@@ -69,3 +133,69 @@ def _is_postcode_relation_for(result: SearchResult, postcode: str) -> bool:
            and result.category == ('boundary', 'postal_code') \
            and result.names is not None \
            and result.names.get('ref') == postcode
            and result.category == ('boundary', 'postal_code') \
            and result.names is not None \
            and result.names.get('ref') == postcode
+
+
+def _deg(axis:str) -> str:
+    return f"(?P<{axis}_deg>\\d+\\.\\d+)°?"
+
+def _deg_min(axis: str) -> str:
+    return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>[\\d.]+)[′']*"
+
+def _deg_min_sec(axis: str) -> str:
+    return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>\\d+)[′'\\s]+(?P<{axis}_sec>[\\d.]+)[\"″]*"
+
+COORD_REGEX = [re.compile(r'(?:(?P<pre>.*?)\s+)??' + r + r'(?:\s+(?P<post>.*))?') for r in (
+    r"(?P<ns>[NS])\s*" + _deg('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg('lon'),
+    _deg('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg('lon') + r"\s*(?P<ew>[EW])",
+    r"(?P<ns>[NS])\s*" + _deg_min('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min('lon'),
+    _deg_min('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min('lon') + r"\s*(?P<ew>[EW])",
+    r"(?P<ns>[NS])\s*" + _deg_min_sec('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min_sec('lon'),
+    _deg_min_sec('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min_sec('lon') + r"\s*(?P<ew>[EW])",
+    r"\[?(?P<lat_deg>[+-]?\d+\.\d+)[\s,]+(?P<lon_deg>[+-]?\d+\.\d+)\]?"
+)]
+
+def extract_coords_from_query(query: str) -> Tuple[str, Optional[float], Optional[float]]:
+    """ Look for something that is formatted like a coordinate at the
+        beginning or end of the query. If found, extract the coordinate and
+        return the remaining query (or the empty string if the query
+        consisted of nothing but a coordinate).
+
+        Only the first match will be returned.
+    """
+    for regex in COORD_REGEX:
+        match = regex.fullmatch(query)
+        if match is None:
+            continue
+        groups = match.groupdict()
+        if not groups['pre'] or not groups['post']:
+            x = float(groups['lon_deg']) \
+                + float(groups.get('lon_min', 0.0)) / 60.0 \
+                + float(groups.get('lon_sec', 0.0)) / 3600.0
+            if groups.get('ew') == 'W':
+                x = -x
+            y = float(groups['lat_deg']) \
+                + float(groups.get('lat_min', 0.0)) / 60.0 \
+                + float(groups.get('lat_sec', 0.0)) / 3600.0
+            if groups.get('ns') == 'S':
+                y = -y
+            return groups['pre'] or groups['post'] or '', x, y
+
+    return query, None, None
+
+
+CATEGORY_REGEX = re.compile(r'(?P<pre>.*?)\[(?P<cls>[a-zA-Z_]+)=(?P<typ>[a-zA-Z_]+)\](?P<post>.*)')
+
+def extract_category_from_query(query: str) -> Tuple[str, Optional[str], Optional[str]]:
+    """ Extract a hidden category specification of the form '[key=value]' from
+        the query. If found, extract key and value  and
+        return the remaining query (or the empty string if the query
+        consisted of nothing but a category).
+
+        Only the first match will be returned.
+    """
+    match = CATEGORY_REGEX.search(query)
+    if match is not None:
+        return (match.group('pre').strip() + ' ' + match.group('post').strip()).strip(), \
+               match.group('cls'), match.group('typ')
+
+    return query, None, None