Generic part of the server implementation of the v1 API.
Combine with the scaffolding provided for the various Python ASGI frameworks.
"""
-from typing import Optional, Any, Type, Callable, NoReturn, Dict, cast
+from typing import Optional, Any, Type, Dict, cast, Sequence, Tuple
from functools import reduce
-import abc
import dataclasses
-import math
from urllib.parse import urlencode
import sqlalchemy as sa
from ..errors import UsageError
-from ..config import Configuration
from .. import logging as loglib
from ..core import NominatimAPIAsync
-from .format import dispatch as formatting
from .format import RawDataList
from ..types import DataLayer, GeometryFormat, PlaceRef, PlaceID, OsmID, Point
from ..status import StatusResult
from ..results import DetailedResult, ReverseResults, SearchResult, SearchResults
from ..localization import Locales
from . import helpers
+from ..server import content_types as ct
+from ..server.asgi_adaptor import ASGIAdaptor, EndpointFunc
+from ..sql.async_core_library import PGCORE_ERROR
-CONTENT_TEXT = 'text/plain; charset=utf-8'
-CONTENT_XML = 'text/xml; charset=utf-8'
-CONTENT_HTML = 'text/html; charset=utf-8'
-CONTENT_JSON = 'application/json; charset=utf-8'
-CONTENT_TYPE = {'text': CONTENT_TEXT, 'xml': CONTENT_XML, 'debug': CONTENT_HTML}
-
-class ASGIAdaptor(abc.ABC):
- """ Adapter class for the different ASGI frameworks.
- Wraps functionality over concrete requests and responses.
+def build_response(adaptor: ASGIAdaptor, output: str, status: int = 200,
+ num_results: int = 0) -> Any:
+ """ Create a response from the given output. Wraps a JSONP function
+ around the response, if necessary.
"""
- content_type: str = CONTENT_TEXT
-
- @abc.abstractmethod
- def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
- """ Return an input parameter as a string. If the parameter was
- not provided, return the 'default' value.
- """
-
- @abc.abstractmethod
- def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
- """ Return a HTTP header parameter as a string. If the parameter was
- not provided, return the 'default' value.
- """
-
-
- @abc.abstractmethod
- def error(self, msg: str, status: int = 400) -> Exception:
- """ Construct an appropriate exception from the given error message.
- The exception must result in a HTTP error with the given status.
- """
-
-
- @abc.abstractmethod
- def create_response(self, status: int, output: str, num_results: int) -> Any:
- """ Create a response from the given parameters. The result will
- be returned by the endpoint functions. The adaptor may also
- return None when the response is created internally with some
- different means.
-
- The response must return the HTTP given status code 'status', set
- the HTTP content-type headers to the string provided and the
- body of the response to 'output'.
- """
-
- @abc.abstractmethod
- def base_uri(self) -> str:
- """ Return the URI of the original request.
- """
-
-
- @abc.abstractmethod
- def config(self) -> Configuration:
- """ Return the current configuration object.
- """
-
-
- def build_response(self, output: str, status: int = 200, num_results: int = 0) -> Any:
- """ Create a response from the given output. Wraps a JSONP function
- around the response, if necessary.
- """
- if self.content_type == CONTENT_JSON and status == 200:
- jsonp = self.get('json_callback')
- if jsonp is not None:
- if any(not part.isidentifier() for part in jsonp.split('.')):
- self.raise_error('Invalid json_callback value')
- output = f"{jsonp}({output})"
- self.content_type = 'application/javascript; charset=utf-8'
-
- return self.create_response(status, output, num_results)
-
-
- def raise_error(self, msg: str, status: int = 400) -> NoReturn:
- """ Raise an exception resulting in the given HTTP status and
- message. The message will be formatted according to the
- output format chosen by the request.
- """
- if self.content_type == CONTENT_XML:
- msg = f"""<?xml version="1.0" encoding="UTF-8" ?>
- <error>
- <code>{status}</code>
- <message>{msg}</message>
- </error>
- """
- elif self.content_type == CONTENT_JSON:
- msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
- elif self.content_type == CONTENT_HTML:
- loglib.log().section('Execution error')
- loglib.log().var_dump('Status', status)
- loglib.log().var_dump('Message', msg)
- msg = loglib.get_and_disable()
-
- raise self.error(msg, status)
-
-
- def get_int(self, name: str, default: Optional[int] = None) -> int:
- """ Return an input parameter as an int. Raises an exception if
- the parameter is given but not in an integer format.
-
- If 'default' is given, then it will be returned when the parameter
- is missing completely. When 'default' is None, an error will be
- raised on a missing parameter.
- """
- value = self.get(name)
-
- if value is None:
- if default is not None:
- return default
-
- self.raise_error(f"Parameter '{name}' missing.")
-
- try:
- intval = int(value)
- except ValueError:
- self.raise_error(f"Parameter '{name}' must be a number.")
-
- return intval
-
-
- def get_float(self, name: str, default: Optional[float] = None) -> float:
- """ Return an input parameter as a flaoting-point number. Raises an
- exception if the parameter is given but not in an float format.
-
- If 'default' is given, then it will be returned when the parameter
- is missing completely. When 'default' is None, an error will be
- raised on a missing parameter.
- """
- value = self.get(name)
-
- if value is None:
- if default is not None:
- return default
-
- self.raise_error(f"Parameter '{name}' missing.")
+ if adaptor.content_type == ct.CONTENT_JSON and status == 200:
+ jsonp = adaptor.get('json_callback')
+ if jsonp is not None:
+ if any(not part.isidentifier() for part in jsonp.split('.')):
+ adaptor.raise_error('Invalid json_callback value')
+ output = f"{jsonp}({output})"
+ adaptor.content_type = 'application/javascript; charset=utf-8'
- try:
- fval = float(value)
- except ValueError:
- self.raise_error(f"Parameter '{name}' must be a number.")
+ return adaptor.create_response(status, output, num_results)
- if math.isnan(fval) or math.isinf(fval):
- self.raise_error(f"Parameter '{name}' must be a number.")
- return fval
-
-
- def get_bool(self, name: str, default: Optional[bool] = None) -> bool:
- """ Return an input parameter as bool. Only '0' is accepted as
- an input for 'false' all other inputs will be interpreted as 'true'.
-
- If 'default' is given, then it will be returned when the parameter
- is missing completely. When 'default' is None, an error will be
- raised on a missing parameter.
- """
- value = self.get(name)
-
- if value is None:
- if default is not None:
- return default
-
- self.raise_error(f"Parameter '{name}' missing.")
+def get_accepted_languages(adaptor: ASGIAdaptor) -> str:
+ """ Return the accepted languages.
+ """
+ return adaptor.get('accept-language')\
+ or adaptor.get_header('accept-language')\
+ or adaptor.config().DEFAULT_LANGUAGE
- return value != '0'
+def setup_debugging(adaptor: ASGIAdaptor) -> bool:
+ """ Set up collection of debug information if requested.
- def get_accepted_languages(self) -> str:
- """ Return the accepted languages.
- """
- return self.get('accept-language')\
- or self.get_header('accept-language')\
- or self.config().DEFAULT_LANGUAGE
+ Return True when debugging was requested.
+ """
+ if adaptor.get_bool('debug', False):
+ loglib.set_log_output('html')
+ adaptor.content_type = ct.CONTENT_HTML
+ return True
+ return False
- def setup_debugging(self) -> bool:
- """ Set up collection of debug information if requested.
- Return True when debugging was requested.
- """
- if self.get_bool('debug', False):
- loglib.set_log_output('html')
- self.content_type = CONTENT_HTML
- return True
+def get_layers(adaptor: ASGIAdaptor) -> Optional[DataLayer]:
+ """ Return a parsed version of the layer parameter.
+ """
+ param = adaptor.get('layer', None)
+ if param is None:
+ return None
- return False
+ return cast(DataLayer,
+ reduce(DataLayer.__or__,
+ (getattr(DataLayer, s.upper()) for s in param.split(','))))
- def get_layers(self) -> Optional[DataLayer]:
- """ Return a parsed version of the layer parameter.
- """
- param = self.get('layer', None)
- if param is None:
- return None
+def parse_format(adaptor: ASGIAdaptor, result_type: Type[Any], default: str) -> str:
+ """ Get and check the 'format' parameter and prepare the formatter.
+ `result_type` is the type of result to be returned by the function
+ and `default` the format value to assume when no parameter is present.
+ """
+ fmt = adaptor.get('format', default=default)
+ assert fmt is not None
- return cast(DataLayer,
- reduce(DataLayer.__or__,
- (getattr(DataLayer, s.upper()) for s in param.split(','))))
+ formatting = adaptor.formatting()
+ if not formatting.supports_format(result_type, fmt):
+ adaptor.raise_error("Parameter 'format' must be one of: " +
+ ', '.join(formatting.list_formats(result_type)))
- def parse_format(self, result_type: Type[Any], default: str) -> str:
- """ Get and check the 'format' parameter and prepare the formatter.
- `result_type` is the type of result to be returned by the function
- and `default` the format value to assume when no parameter is present.
- """
- fmt = self.get('format', default=default)
- assert fmt is not None
+ adaptor.content_type = formatting.get_content_type(fmt)
+ return fmt
- if not formatting.supports_format(result_type, fmt):
- self.raise_error("Parameter 'format' must be one of: " +
- ', '.join(formatting.list_formats(result_type)))
- self.content_type = CONTENT_TYPE.get(fmt, CONTENT_JSON)
- return fmt
+def parse_geometry_details(adaptor: ASGIAdaptor, fmt: str) -> Dict[str, Any]:
+ """ Create details structure from the supplied geometry parameters.
+ """
+ numgeoms = 0
+ output = GeometryFormat.NONE
+ if adaptor.get_bool('polygon_geojson', False):
+ output |= GeometryFormat.GEOJSON
+ numgeoms += 1
+ if fmt not in ('geojson', 'geocodejson'):
+ if adaptor.get_bool('polygon_text', False):
+ output |= GeometryFormat.TEXT
+ numgeoms += 1
+ if adaptor.get_bool('polygon_kml', False):
+ output |= GeometryFormat.KML
+ numgeoms += 1
+ if adaptor.get_bool('polygon_svg', False):
+ output |= GeometryFormat.SVG
+ numgeoms += 1
+ if numgeoms > adaptor.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
+ adaptor.raise_error('Too many polygon output options selected.')
- def parse_geometry_details(self, fmt: str) -> Dict[str, Any]:
- """ Create details structure from the supplied geometry parameters.
- """
- numgeoms = 0
- output = GeometryFormat.NONE
- if self.get_bool('polygon_geojson', False):
- output |= GeometryFormat.GEOJSON
- numgeoms += 1
- if fmt not in ('geojson', 'geocodejson'):
- if self.get_bool('polygon_text', False):
- output |= GeometryFormat.TEXT
- numgeoms += 1
- if self.get_bool('polygon_kml', False):
- output |= GeometryFormat.KML
- numgeoms += 1
- if self.get_bool('polygon_svg', False):
- output |= GeometryFormat.SVG
- numgeoms += 1
-
- if numgeoms > self.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
- self.raise_error('Too many polygon output options selected.')
-
- return {'address_details': True,
- 'geometry_simplification': self.get_float('polygon_threshold', 0.0),
- 'geometry_output': output
- }
+ return {'address_details': True,
+ 'geometry_simplification': adaptor.get_float('polygon_threshold', 0.0),
+ 'geometry_output': output
+ }
async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
"""
result = await api.status()
- fmt = params.parse_format(StatusResult, 'text')
+ fmt = parse_format(params, StatusResult, 'text')
if fmt == 'text' and result.status:
status_code = 500
else:
status_code = 200
- return params.build_response(formatting.format_result(result, fmt, {}),
- status=status_code)
+ return build_response(params, params.formatting().format_result(result, fmt, {}),
+ status=status_code)
async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /details endpoint. See API docs for details.
"""
- fmt = params.parse_format(DetailedResult, 'json')
+ fmt = parse_format(params, DetailedResult, 'json')
place_id = params.get_int('place_id', 0)
place: PlaceRef
if place_id:
params.raise_error("Missing ID parameter 'place_id' or 'osmtype'.")
place = OsmID(osmtype, params.get_int('osmid'), params.get('class'))
- debug = params.setup_debugging()
+ debug = setup_debugging(params)
- locales = Locales.from_accept_languages(params.get_accepted_languages())
+ locales = Locales.from_accept_languages(get_accepted_languages(params))
result = await api.details(place,
address_details=params.get_bool('addressdetails', False),
linked_places=params.get_bool('linkedplaces', True),
parented_places=params.get_bool('hierarchy', False),
keywords=params.get_bool('keywords', False),
- geometry_output = GeometryFormat.GEOJSON
- if params.get_bool('polygon_geojson', False)
- else GeometryFormat.NONE,
+ geometry_output=(GeometryFormat.GEOJSON
+ if params.get_bool('polygon_geojson', False)
+ else GeometryFormat.NONE),
locales=locales
- )
+ )
if debug:
- return params.build_response(loglib.get_and_disable())
+ return build_response(params, loglib.get_and_disable())
if result is None:
params.raise_error('No place with that OSM ID found.', status=404)
- output = formatting.format_result(result, fmt,
- {'locales': locales,
- 'group_hierarchy': params.get_bool('group_hierarchy', False),
- 'icon_base_url': params.config().MAPICON_URL})
+ output = params.formatting().format_result(
+ result, fmt,
+ {'locales': locales,
+ 'group_hierarchy': params.get_bool('group_hierarchy', False),
+ 'icon_base_url': params.config().MAPICON_URL})
- return params.build_response(output, num_results=1)
+ return build_response(params, output, num_results=1)
async def reverse_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /reverse endpoint. See API docs for details.
"""
- fmt = params.parse_format(ReverseResults, 'xml')
- debug = params.setup_debugging()
+ fmt = parse_format(params, ReverseResults, 'xml')
+ debug = setup_debugging(params)
coord = Point(params.get_float('lon'), params.get_float('lat'))
- details = params.parse_geometry_details(fmt)
+ details = parse_geometry_details(params, fmt)
details['max_rank'] = helpers.zoom_to_rank(params.get_int('zoom', 18))
- details['layers'] = params.get_layers()
- details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+ details['layers'] = get_layers(params)
+ details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
result = await api.reverse(coord, **details)
if debug:
- return params.build_response(loglib.get_and_disable(), num_results=1 if result else 0)
+ return build_response(params, loglib.get_and_disable(), num_results=1 if result else 0)
if fmt == 'xml':
queryparts = {'lat': str(coord.lat), 'lon': str(coord.lon), 'format': 'xml'}
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)}
- output = formatting.format_result(ReverseResults([result] if result else []),
- fmt, fmt_options)
+ output = params.formatting().format_result(ReverseResults([result] if result else []),
+ fmt, fmt_options)
- return params.build_response(output, num_results=1 if result else 0)
+ return build_response(params, output, num_results=1 if result else 0)
async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /lookup endpoint. See API docs for details.
"""
- fmt = params.parse_format(SearchResults, 'xml')
- debug = params.setup_debugging()
- details = params.parse_geometry_details(fmt)
- details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+ fmt = parse_format(params, SearchResults, 'xml')
+ debug = setup_debugging(params)
+ details = parse_geometry_details(params, fmt)
+ details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
places = []
for oid in (params.get('osm_ids') or '').split(','):
results = SearchResults()
if debug:
- return params.build_response(loglib.get_and_disable(), num_results=len(results))
+ return build_response(params, loglib.get_and_disable(), num_results=len(results))
fmt_options = {'extratags': params.get_bool('extratags', False),
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)}
- output = formatting.format_result(results, fmt, fmt_options)
+ output = params.formatting().format_result(results, fmt, fmt_options)
- return params.build_response(output, num_results=len(results))
+ return build_response(params, output, num_results=len(results))
async def _unstructured_search(query: str, api: NominatimAPIAsync,
- details: Dict[str, Any]) -> SearchResults:
+ details: Dict[str, Any]) -> SearchResults:
if not query:
return SearchResults()
async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /search endpoint. See API docs for details.
"""
- fmt = params.parse_format(SearchResults, 'jsonv2')
- debug = params.setup_debugging()
- details = params.parse_geometry_details(fmt)
+ fmt = parse_format(params, SearchResults, 'jsonv2')
+ debug = setup_debugging(params)
+ details = parse_geometry_details(params, fmt)
- details['countries'] = params.get('countrycodes', None)
+ details['countries'] = params.get('countrycodes', None)
details['excluded'] = params.get('exclude_place_ids', None)
details['viewbox'] = params.get('viewbox', None) or params.get('viewboxlbrt', None)
details['bounded_viewbox'] = params.get_bool('bounded', False)
details['dedupe'] = params.get_bool('dedupe', True)
max_results = max(1, min(50, params.get_int('limit', 10)))
- details['max_results'] = max_results + min(10, max_results) \
- if details['dedupe'] else max_results
+ details['max_results'] = (max_results + min(10, max_results)
+ if details['dedupe'] else max_results)
details['min_rank'], details['max_rank'] = \
helpers.feature_type_to_rank(params.get('featureType', ''))
if params.get('featureType', None) is not None:
details['layers'] = DataLayer.ADDRESS
else:
- details['layers'] = params.get_layers()
+ details['layers'] = get_layers(params)
- details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+ details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
# unstructured query parameters
query = params.get('q', None)
results = helpers.deduplicate_results(results, max_results)
if debug:
- return params.build_response(loglib.get_and_disable(), num_results=len(results))
+ return build_response(params, loglib.get_and_disable(), num_results=len(results))
if fmt == 'xml':
helpers.extend_query_parts(queryparts, details,
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', False)}
- output = formatting.format_result(results, fmt, fmt_options)
+ output = params.formatting().format_result(results, fmt, fmt_options)
- return params.build_response(output, num_results=len(results))
+ return build_response(params, output, num_results=len(results))
async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
deleted or are broken in the OSM data but are kept in the
Nominatim database to minimize disruption.
"""
- fmt = params.parse_format(RawDataList, 'json')
+ fmt = parse_format(params, RawDataList, 'json')
async with api.begin() as conn:
sql = sa.text(""" SELECT p.place_id, country_code,
""")
results = RawDataList(r._asdict() for r in await conn.execute(sql))
- return params.build_response(formatting.format_result(results, fmt, {}))
+ return build_response(params, params.formatting().format_result(results, fmt, {}))
async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
their size but are kept in the Nominatim database with their
old area to minimize disruption.
"""
- fmt = params.parse_format(RawDataList, 'json')
+ fmt = parse_format(params, RawDataList, 'json')
sql_params: Dict[str, Any] = {
'days': params.get_int('days', -1),
'cls': params.get('class')
results = RawDataList(r._asdict() for r in await conn.execute(sql, sql_params))
- return params.build_response(formatting.format_result(results, fmt, {}))
+ return build_response(params, params.formatting().format_result(results, fmt, {}))
+
+async def get_routes(api: NominatimAPIAsync) -> Sequence[Tuple[str, EndpointFunc]]:
+ routes = [
+ ('status', status_endpoint),
+ ('details', details_endpoint),
+ ('reverse', reverse_endpoint),
+ ('lookup', lookup_endpoint),
+ ('deletable', deletable_endpoint),
+ ('polygons', polygons_endpoint),
+ ]
-EndpointFunc = Callable[[NominatimAPIAsync, ASGIAdaptor], Any]
+ def has_search_name(conn: sa.engine.Connection) -> bool:
+ insp = sa.inspect(conn)
+ return insp.has_table('search_name')
+
+ try:
+ async with api.begin() as conn:
+ if await conn.connection.run_sync(has_search_name):
+ routes.append(('search', search_endpoint))
+ except (PGCORE_ERROR, sa.exc.OperationalError):
+ pass # ignored
-ROUTES = [
- ('status', status_endpoint),
- ('details', details_endpoint),
- ('reverse', reverse_endpoint),
- ('lookup', lookup_endpoint),
- ('search', search_endpoint),
- ('deletable', deletable_endpoint),
- ('polygons', polygons_endpoint),
-]
+ return routes