]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/api/v1/helpers.py
split search SQL in windowed search_name lookup and constraint search
[nominatim.git] / nominatim / api / v1 / helpers.py
index e16a22a3cd44bf8a201bb0fdbb2f6a67fec42679..ffd06a6a35e3439d4b47ae92a4bcd4d321506521 100644 (file)
@@ -8,6 +8,12 @@
 Helper function for parsing parameters and and outputting data
 specifically for the v1 version of the API.
 """
+from typing import Tuple, Optional, Any, Dict, Iterable
+from itertools import chain
+import re
+
+from nominatim.api.results import SearchResult, SearchResults, SourceTable
+from nominatim.api.types import SearchDetails, GeometryFormat
 
 REVERSE_MAX_RANKS = [2, 2, 2,   # 0-2   Continent/Sea
                      4, 4,      # 3-4   Country
@@ -29,3 +35,167 @@ def zoom_to_rank(zoom: int) -> int:
     """ Convert a zoom parameter into a rank according to the v1 API spec.
     """
     return REVERSE_MAX_RANKS[max(0, min(18, zoom))]
+
+
+FEATURE_TYPE_TO_RANK: Dict[Optional[str], Tuple[int, int]] = {
+    'country': (4, 4),
+    'state': (8, 8),
+    'city': (14, 16),
+    'settlement': (8, 20)
+}
+
+
+def feature_type_to_rank(feature_type: Optional[str]) -> Tuple[int, int]:
+    """ Convert a feature type parameter to a tuple of
+        feature type name, minimum rank and maximum rank.
+    """
+    return FEATURE_TYPE_TO_RANK.get(feature_type, (0, 30))
+
+
+#pylint: disable=too-many-arguments,too-many-branches
+def extend_query_parts(queryparts: Dict[str, Any], details: Dict[str, Any],
+                       feature_type: Optional[str],
+                       namedetails: bool, extratags: bool,
+                       excluded: Iterable[str]) -> None:
+    """ Add parameters from details dictionary to the query parts
+        dictionary which is suitable as URL parameter dictionary.
+    """
+    parsed = SearchDetails.from_kwargs(details)
+    if parsed.geometry_output != GeometryFormat.NONE:
+        if GeometryFormat.GEOJSON in parsed.geometry_output:
+            queryparts['polygon_geojson'] = '1'
+        if GeometryFormat.KML in parsed.geometry_output:
+            queryparts['polygon_kml'] = '1'
+        if GeometryFormat.SVG in parsed.geometry_output:
+            queryparts['polygon_svg'] = '1'
+        if GeometryFormat.TEXT in parsed.geometry_output:
+            queryparts['polygon_text'] = '1'
+    if parsed.address_details:
+        queryparts['addressdetails'] = '1'
+    if namedetails:
+        queryparts['namedetails'] = '1'
+    if extratags:
+        queryparts['extratags'] = '1'
+    if parsed.geometry_simplification > 0.0:
+        queryparts['polygon_threshold'] = f"{parsed.geometry_simplification:.6g}"
+    if parsed.max_results != 10:
+        queryparts['limit'] = str(parsed.max_results)
+    if parsed.countries:
+        queryparts['countrycodes'] = ','.join(parsed.countries)
+    queryparts['exclude_place_ids'] = \
+        ','.join(chain(excluded, map(str, (e for e in parsed.excluded if e > 0))))
+    if parsed.viewbox:
+        queryparts['viewbox'] = ','.join(f"{c:.7g}" for c in parsed.viewbox.coords)
+    if parsed.bounded_viewbox:
+        queryparts['bounded'] = '1'
+    if not details['dedupe']:
+        queryparts['dedupe'] = '0'
+    if feature_type in FEATURE_TYPE_TO_RANK:
+        queryparts['featureType'] = feature_type
+
+
+def deduplicate_results(results: SearchResults, max_results: int) -> SearchResults:
+    """ Remove results that look like duplicates.
+
+        Two results are considered the same if they have the same OSM ID
+        or if they have the same category, display name and rank.
+    """
+    osm_ids_done = set()
+    classification_done = set()
+    deduped = SearchResults()
+    for result in results:
+        if result.source_table == SourceTable.POSTCODE:
+            assert result.names and 'ref' in result.names
+            if any(_is_postcode_relation_for(r, result.names['ref']) for r in results):
+                continue
+        if result.source_table == SourceTable.PLACEX:
+            classification = (result.osm_object[0] if result.osm_object else None,
+                              result.category,
+                              result.display_name,
+                              result.rank_address)
+            if result.osm_object not in osm_ids_done \
+               and classification not in classification_done:
+                deduped.append(result)
+            osm_ids_done.add(result.osm_object)
+            classification_done.add(classification)
+        else:
+            deduped.append(result)
+        if len(deduped) >= max_results:
+            break
+
+    return deduped
+
+
+def _is_postcode_relation_for(result: SearchResult, postcode: str) -> bool:
+    return result.source_table == SourceTable.PLACEX \
+           and result.osm_object is not None \
+           and result.osm_object[0] == 'R' \
+           and result.category == ('boundary', 'postal_code') \
+           and result.names is not None \
+           and result.names.get('ref') == postcode
+
+
+def _deg(axis:str) -> str:
+    return f"(?P<{axis}_deg>\\d+\\.\\d+)°?"
+
+def _deg_min(axis: str) -> str:
+    return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>[\\d.]+)[′']*"
+
+def _deg_min_sec(axis: str) -> str:
+    return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>\\d+)[′'\\s]+(?P<{axis}_sec>[\\d.]+)[\"″]*"
+
+COORD_REGEX = [re.compile(r'(?:(?P<pre>.*?)\s+)??' + r + r'(?:\s+(?P<post>.*))?') for r in (
+    r"(?P<ns>[NS])\s*" + _deg('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg('lon'),
+    _deg('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg('lon') + r"\s*(?P<ew>[EW])",
+    r"(?P<ns>[NS])\s*" + _deg_min('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min('lon'),
+    _deg_min('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min('lon') + r"\s*(?P<ew>[EW])",
+    r"(?P<ns>[NS])\s*" + _deg_min_sec('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min_sec('lon'),
+    _deg_min_sec('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min_sec('lon') + r"\s*(?P<ew>[EW])",
+    r"\[?(?P<lat_deg>[+-]?\d+\.\d+)[\s,]+(?P<lon_deg>[+-]?\d+\.\d+)\]?"
+)]
+
+def extract_coords_from_query(query: str) -> Tuple[str, Optional[float], Optional[float]]:
+    """ Look for something that is formatted like a coordinate at the
+        beginning or end of the query. If found, extract the coordinate and
+        return the remaining query (or the empty string if the query
+        consisted of nothing but a coordinate).
+
+        Only the first match will be returned.
+    """
+    for regex in COORD_REGEX:
+        match = regex.fullmatch(query)
+        if match is None:
+            continue
+        groups = match.groupdict()
+        if not groups['pre'] or not groups['post']:
+            x = float(groups['lon_deg']) \
+                + float(groups.get('lon_min', 0.0)) / 60.0 \
+                + float(groups.get('lon_sec', 0.0)) / 3600.0
+            if groups.get('ew') == 'W':
+                x = -x
+            y = float(groups['lat_deg']) \
+                + float(groups.get('lat_min', 0.0)) / 60.0 \
+                + float(groups.get('lat_sec', 0.0)) / 3600.0
+            if groups.get('ns') == 'S':
+                y = -y
+            return groups['pre'] or groups['post'] or '', x, y
+
+    return query, None, None
+
+
+CATEGORY_REGEX = re.compile(r'(?P<pre>.*?)\[(?P<cls>[a-zA-Z_]+)=(?P<typ>[a-zA-Z_]+)\](?P<post>.*)')
+
+def extract_category_from_query(query: str) -> Tuple[str, Optional[str], Optional[str]]:
+    """ Extract a hidden category specification of the form '[key=value]' from
+        the query. If found, extract key and value  and
+        return the remaining query (or the empty string if the query
+        consisted of nothing but a category).
+
+        Only the first match will be returned.
+    """
+    match = CATEGORY_REGEX.search(query)
+    if match is not None:
+        return (match.group('pre').strip() + ' ' + match.group('post').strip()).strip(), \
+               match.group('cls'), match.group('typ')
+
+    return query, None, None