]> git.openstreetmap.org Git - nominatim.git/blobdiff - src/nominatim_api/v1/server_glue.py
remove v1-specific functions from ASGIAdaptor
[nominatim.git] / src / nominatim_api / v1 / server_glue.py
index c00b580bde0d9cb722144856ec10fb5d6b50b4b7..0e954901d73a6aa2768ac1ae737222499b98b881 100644 (file)
@@ -86,44 +86,6 @@ class ASGIAdaptor(abc.ABC):
         """
 
 
-    def build_response(self, output: str, status: int = 200, num_results: int = 0) -> Any:
-        """ Create a response from the given output. Wraps a JSONP function
-            around the response, if necessary.
-        """
-        if self.content_type == CONTENT_JSON and status == 200:
-            jsonp = self.get('json_callback')
-            if jsonp is not None:
-                if any(not part.isidentifier() for part in jsonp.split('.')):
-                    self.raise_error('Invalid json_callback value')
-                output = f"{jsonp}({output})"
-                self.content_type = 'application/javascript; charset=utf-8'
-
-        return self.create_response(status, output, num_results)
-
-
-    def raise_error(self, msg: str, status: int = 400) -> NoReturn:
-        """ Raise an exception resulting in the given HTTP status and
-            message. The message will be formatted according to the
-            output format chosen by the request.
-        """
-        if self.content_type == CONTENT_XML:
-            msg = f"""<?xml version="1.0" encoding="UTF-8" ?>
-                      <error>
-                        <code>{status}</code>
-                        <message>{msg}</message>
-                      </error>
-                   """
-        elif self.content_type == CONTENT_JSON:
-            msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
-        elif self.content_type == CONTENT_HTML:
-            loglib.log().section('Execution error')
-            loglib.log().var_dump('Status', status)
-            loglib.log().var_dump('Message', msg)
-            msg = loglib.get_and_disable()
-
-        raise self.error(msg, status)
-
-
     def get_int(self, name: str, default: Optional[int] = None) -> int:
         """ Return an input parameter as an int. Raises an exception if
             the parameter is given but not in an integer format.
@@ -194,81 +156,120 @@ class ASGIAdaptor(abc.ABC):
         return value != '0'
 
 
-    def get_accepted_languages(self) -> str:
-        """ Return the accepted languages.
+    def raise_error(self, msg: str, status: int = 400) -> NoReturn:
+        """ Raise an exception resulting in the given HTTP status and
+            message. The message will be formatted according to the
+            output format chosen by the request.
         """
-        return self.get('accept-language')\
-               or self.get_header('accept-language')\
-               or self.config().DEFAULT_LANGUAGE
+        if self.content_type == CONTENT_XML:
+            msg = f"""<?xml version="1.0" encoding="UTF-8" ?>
+                      <error>
+                        <code>{status}</code>
+                        <message>{msg}</message>
+                      </error>
+                   """
+        elif self.content_type == CONTENT_JSON:
+            msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
+        elif self.content_type == CONTENT_HTML:
+            loglib.log().section('Execution error')
+            loglib.log().var_dump('Status', status)
+            loglib.log().var_dump('Message', msg)
+            msg = loglib.get_and_disable()
 
+        raise self.error(msg, status)
 
-    def setup_debugging(self) -> bool:
-        """ Set up collection of debug information if requested.
 
-            Return True when debugging was requested.
-        """
-        if self.get_bool('debug', False):
-            loglib.set_log_output('html')
-            self.content_type = CONTENT_HTML
-            return True
+def build_response(adaptor: ASGIAdaptor, output: str, status: int = 200,
+                   num_results: int = 0) -> Any:
+    """ Create a response from the given output. Wraps a JSONP function
+        around the response, if necessary.
+    """
+    if adaptor.content_type == CONTENT_JSON and status == 200:
+        jsonp = adaptor.get('json_callback')
+        if jsonp is not None:
+            if any(not part.isidentifier() for part in jsonp.split('.')):
+                adaptor.raise_error('Invalid json_callback value')
+            output = f"{jsonp}({output})"
+            adaptor.content_type = 'application/javascript; charset=utf-8'
 
-        return False
+    return adaptor.create_response(status, output, num_results)
 
 
-    def get_layers(self) -> Optional[DataLayer]:
-        """ Return a parsed version of the layer parameter.
-        """
-        param = self.get('layer', None)
-        if param is None:
-            return None
+def get_accepted_languages(adaptor: ASGIAdaptor) -> str:
+    """ Return the accepted languages.
+    """
+    return adaptor.get('accept-language')\
+           or adaptor.get_header('accept-language')\
+           or adaptor.config().DEFAULT_LANGUAGE
 
-        return cast(DataLayer,
-                    reduce(DataLayer.__or__,
-                           (getattr(DataLayer, s.upper()) for s in param.split(','))))
 
+def setup_debugging(adaptor: ASGIAdaptor) -> bool:
+    """ Set up collection of debug information if requested.
 
-    def parse_format(self, result_type: Type[Any], default: str) -> str:
-        """ Get and check the 'format' parameter and prepare the formatter.
-            `result_type` is the type of result to be returned by the function
-            and `default` the format value to assume when no parameter is present.
-        """
-        fmt = self.get('format', default=default)
-        assert fmt is not None
+        Return True when debugging was requested.
+    """
+    if adaptor.get_bool('debug', False):
+        loglib.set_log_output('html')
+        adaptor.content_type = CONTENT_HTML
+        return True
 
-        if not formatting.supports_format(result_type, fmt):
-            self.raise_error("Parameter 'format' must be one of: " +
-                              ', '.join(formatting.list_formats(result_type)))
+    return False
 
-        self.content_type = CONTENT_TYPE.get(fmt, CONTENT_JSON)
-        return fmt
 
+def get_layers(adaptor: ASGIAdaptor) -> Optional[DataLayer]:
+    """ Return a parsed version of the layer parameter.
+    """
+    param = adaptor.get('layer', None)
+    if param is None:
+        return None
 
-    def parse_geometry_details(self, fmt: str) -> Dict[str, Any]:
-        """ Create details structure from the supplied geometry parameters.
-        """
-        numgeoms = 0
-        output = GeometryFormat.NONE
-        if self.get_bool('polygon_geojson', False):
-            output |= GeometryFormat.GEOJSON
+    return cast(DataLayer,
+                reduce(DataLayer.__or__,
+                       (getattr(DataLayer, s.upper()) for s in param.split(','))))
+
+
+def parse_format(adaptor: ASGIAdaptor, result_type: Type[Any], default: str) -> str:
+    """ Get and check the 'format' parameter and prepare the formatter.
+        `result_type` is the type of result to be returned by the function
+        and `default` the format value to assume when no parameter is present.
+    """
+    fmt = adaptor.get('format', default=default)
+    assert fmt is not None
+
+    if not formatting.supports_format(result_type, fmt):
+        adaptor.raise_error("Parameter 'format' must be one of: " +
+                          ', '.join(formatting.list_formats(result_type)))
+
+    adaptor.content_type = CONTENT_TYPE.get(fmt, CONTENT_JSON)
+    return fmt
+
+
+def parse_geometry_details(adaptor: ASGIAdaptor, fmt: str) -> Dict[str, Any]:
+    """ Create details structure from the supplied geometry parameters.
+    """
+    numgeoms = 0
+    output = GeometryFormat.NONE
+    if adaptor.get_bool('polygon_geojson', False):
+        output |= GeometryFormat.GEOJSON
+        numgeoms += 1
+    if fmt not in ('geojson', 'geocodejson'):
+        if adaptor.get_bool('polygon_text', False):
+            output |= GeometryFormat.TEXT
             numgeoms += 1
-        if fmt not in ('geojson', 'geocodejson'):
-            if self.get_bool('polygon_text', False):
-                output |= GeometryFormat.TEXT
-                numgeoms += 1
-            if self.get_bool('polygon_kml', False):
-                output |= GeometryFormat.KML
-                numgeoms += 1
-            if self.get_bool('polygon_svg', False):
-                output |= GeometryFormat.SVG
-                numgeoms += 1
-
-        if numgeoms > self.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
-            self.raise_error('Too many polygon output options selected.')
-
-        return {'address_details': True,
-                'geometry_simplification': self.get_float('polygon_threshold', 0.0),
-                'geometry_output': output
-               }
+        if adaptor.get_bool('polygon_kml', False):
+            output |= GeometryFormat.KML
+            numgeoms += 1
+        if adaptor.get_bool('polygon_svg', False):
+            output |= GeometryFormat.SVG
+            numgeoms += 1
+
+    if numgeoms > adaptor.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
+        adaptor.raise_error('Too many polygon output options selected.')
+
+    return {'address_details': True,
+            'geometry_simplification': adaptor.get_float('polygon_threshold', 0.0),
+            'geometry_output': output
+           }
 
 
 async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@@ -276,21 +277,21 @@ async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
     """
     result = await api.status()
 
-    fmt = params.parse_format(StatusResult, 'text')
+    fmt = parse_format(params, StatusResult, 'text')
 
     if fmt == 'text' and result.status:
         status_code = 500
     else:
         status_code = 200
 
-    return params.build_response(formatting.format_result(result, fmt, {}),
+    return build_response(params, formatting.format_result(result, fmt, {}),
                                  status=status_code)
 
 
 async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
     """ Server glue for /details endpoint. See API docs for details.
     """
-    fmt = params.parse_format(DetailedResult, 'json')
+    fmt = parse_format(params, DetailedResult, 'json')
     place_id = params.get_int('place_id', 0)
     place: PlaceRef
     if place_id:
@@ -301,9 +302,9 @@ async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
             params.raise_error("Missing ID parameter 'place_id' or 'osmtype'.")
         place = OsmID(osmtype, params.get_int('osmid'), params.get('class'))
 
-    debug = params.setup_debugging()
+    debug = setup_debugging(params)
 
-    locales = Locales.from_accept_languages(params.get_accepted_languages())
+    locales = Locales.from_accept_languages(get_accepted_languages(params))
 
     result = await api.details(place,
                                address_details=params.get_bool('addressdetails', False),
@@ -317,7 +318,7 @@ async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
                               )
 
     if debug:
-        return params.build_response(loglib.get_and_disable())
+        return build_response(params, loglib.get_and_disable())
 
     if result is None:
         params.raise_error('No place with that OSM ID found.', status=404)
@@ -327,25 +328,25 @@ async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
                   'group_hierarchy': params.get_bool('group_hierarchy', False),
                   'icon_base_url': params.config().MAPICON_URL})
 
-    return params.build_response(output, num_results=1)
+    return build_response(params, output, num_results=1)
 
 
 async def reverse_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
     """ Server glue for /reverse endpoint. See API docs for details.
     """
-    fmt = params.parse_format(ReverseResults, 'xml')
-    debug = params.setup_debugging()
+    fmt = parse_format(params, ReverseResults, 'xml')
+    debug = setup_debugging(params)
     coord = Point(params.get_float('lon'), params.get_float('lat'))
 
-    details = params.parse_geometry_details(fmt)
+    details = parse_geometry_details(params, fmt)
     details['max_rank'] = helpers.zoom_to_rank(params.get_int('zoom', 18))
-    details['layers'] = params.get_layers()
-    details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+    details['layers'] = get_layers(params)
+    details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
 
     result = await api.reverse(coord, **details)
 
     if debug:
-        return params.build_response(loglib.get_and_disable(), num_results=1 if result else 0)
+        return build_response(params, loglib.get_and_disable(), num_results=1 if result else 0)
 
     if fmt == 'xml':
         queryparts = {'lat': str(coord.lat), 'lon': str(coord.lon), 'format': 'xml'}
@@ -364,16 +365,16 @@ async def reverse_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
     output = formatting.format_result(ReverseResults([result] if result else []),
                                       fmt, fmt_options)
 
-    return params.build_response(output, num_results=1 if result else 0)
+    return build_response(params, output, num_results=1 if result else 0)
 
 
 async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
     """ Server glue for /lookup endpoint. See API docs for details.
     """
-    fmt = params.parse_format(SearchResults, 'xml')
-    debug = params.setup_debugging()
-    details = params.parse_geometry_details(fmt)
-    details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+    fmt = parse_format(params, SearchResults, 'xml')
+    debug = setup_debugging(params)
+    details = parse_geometry_details(params, fmt)
+    details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
 
     places = []
     for oid in (params.get('osm_ids') or '').split(','):
@@ -390,7 +391,7 @@ async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
         results = SearchResults()
 
     if debug:
-        return params.build_response(loglib.get_and_disable(), num_results=len(results))
+        return build_response(params, loglib.get_and_disable(), num_results=len(results))
 
     fmt_options = {'extratags': params.get_bool('extratags', False),
                    'namedetails': params.get_bool('namedetails', False),
@@ -398,7 +399,7 @@ async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
 
     output = formatting.format_result(results, fmt, fmt_options)
 
-    return params.build_response(output, num_results=len(results))
+    return build_response(params, output, num_results=len(results))
 
 
 async def _unstructured_search(query: str, api: NominatimAPIAsync,
@@ -435,9 +436,9 @@ async def _unstructured_search(query: str, api: NominatimAPIAsync,
 async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
     """ Server glue for /search endpoint. See API docs for details.
     """
-    fmt = params.parse_format(SearchResults, 'jsonv2')
-    debug = params.setup_debugging()
-    details = params.parse_geometry_details(fmt)
+    fmt = parse_format(params, SearchResults, 'jsonv2')
+    debug = setup_debugging(params)
+    details = parse_geometry_details(params, fmt)
 
     details['countries']  = params.get('countrycodes', None)
     details['excluded'] = params.get('exclude_place_ids', None)
@@ -454,9 +455,9 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
     if params.get('featureType', None) is not None:
         details['layers'] = DataLayer.ADDRESS
     else:
-        details['layers'] = params.get_layers()
+        details['layers'] = get_layers(params)
 
-    details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+    details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
 
     # unstructured query parameters
     query = params.get('q', None)
@@ -486,7 +487,7 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
         results = helpers.deduplicate_results(results, max_results)
 
     if debug:
-        return params.build_response(loglib.get_and_disable(), num_results=len(results))
+        return build_response(params, loglib.get_and_disable(), num_results=len(results))
 
     if fmt == 'xml':
         helpers.extend_query_parts(queryparts, details,
@@ -509,7 +510,7 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
 
     output = formatting.format_result(results, fmt, fmt_options)
 
-    return params.build_response(output, num_results=len(results))
+    return build_response(params, output, num_results=len(results))
 
 
 async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@@ -518,7 +519,7 @@ async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any
         deleted or are broken in the OSM data but are kept in the
         Nominatim database to minimize disruption.
     """
-    fmt = params.parse_format(RawDataList, 'json')
+    fmt = parse_format(params, RawDataList, 'json')
 
     async with api.begin() as conn:
         sql = sa.text(""" SELECT p.place_id, country_code,
@@ -529,7 +530,7 @@ async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any
                       """)
         results = RawDataList(r._asdict() for r in await conn.execute(sql))
 
-    return params.build_response(formatting.format_result(results, fmt, {}))
+    return build_response(params, formatting.format_result(results, fmt, {}))
 
 
 async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@@ -538,7 +539,7 @@ async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
         their size but are kept in the Nominatim database with their
         old area to minimize disruption.
     """
-    fmt = params.parse_format(RawDataList, 'json')
+    fmt = parse_format(params, RawDataList, 'json')
     sql_params: Dict[str, Any] = {
         'days': params.get_int('days', -1),
         'cls': params.get('class')
@@ -561,7 +562,7 @@ async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
 
         results = RawDataList(r._asdict() for r in await conn.execute(sql, sql_params))
 
-    return params.build_response(formatting.format_result(results, fmt, {}))
+    return build_response(params, formatting.format_result(results, fmt, {}))
 
 
 EndpointFunc = Callable[[NominatimAPIAsync, ASGIAdaptor], Any]