import math
from urllib.parse import urlencode
+import sqlalchemy as sa
+
from nominatim.errors import UsageError
from nominatim.config import Configuration
import nominatim.api as napi
import nominatim.api.logging as loglib
from nominatim.api.v1.format import dispatch as formatting
+from nominatim.api.v1.format import RawDataList
from nominatim.api.v1 import helpers
-CONTENT_TYPE = {
- 'text': 'text/plain; charset=utf-8',
- 'xml': 'text/xml; charset=utf-8',
- 'debug': 'text/html; charset=utf-8'
-}
+CONTENT_TEXT = 'text/plain; charset=utf-8'
+CONTENT_XML = 'text/xml; charset=utf-8'
+CONTENT_HTML = 'text/html; charset=utf-8'
+CONTENT_JSON = 'application/json; charset=utf-8'
+
+CONTENT_TYPE = {'text': CONTENT_TEXT, 'xml': CONTENT_XML, 'debug': CONTENT_HTML}
class ASGIAdaptor(abc.ABC):
""" Adapter class for the different ASGI frameworks.
Wraps functionality over concrete requests and responses.
"""
- content_type: str = 'text/plain; charset=utf-8'
+ content_type: str = CONTENT_TEXT
@abc.abstractmethod
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
@abc.abstractmethod
- def create_response(self, status: int, output: str) -> Any:
+ def create_response(self, status: int, output: str, num_results: int) -> Any:
""" Create a response from the given parameters. The result will
be returned by the endpoint functions. The adaptor may also
return None when the response is created internally with some
body of the response to 'output'.
"""
+ @abc.abstractmethod
+ def base_uri(self) -> str:
+ """ Return the URI of the original request.
+ """
+
@abc.abstractmethod
def config(self) -> Configuration:
"""
- def build_response(self, output: str, status: int = 200) -> Any:
+ def build_response(self, output: str, status: int = 200, num_results: int = 0) -> Any:
""" Create a response from the given output. Wraps a JSONP function
around the response, if necessary.
"""
- if self.content_type == 'application/json' and status == 200:
+ if self.content_type == CONTENT_JSON and status == 200:
jsonp = self.get('json_callback')
if jsonp is not None:
if any(not part.isidentifier() for part in jsonp.split('.')):
self.raise_error('Invalid json_callback value')
output = f"{jsonp}({output})"
- self.content_type = 'application/javascript'
+ self.content_type = 'application/javascript; charset=utf-8'
- return self.create_response(status, output)
+ return self.create_response(status, output, num_results)
def raise_error(self, msg: str, status: int = 400) -> NoReturn:
message. The message will be formatted according to the
output format chosen by the request.
"""
- if self.content_type == 'text/xml; charset=utf-8':
+ if self.content_type == CONTENT_XML:
msg = f"""<?xml version="1.0" encoding="UTF-8" ?>
<error>
<code>{status}</code>
<message>{msg}</message>
</error>
"""
- elif self.content_type == 'application/json':
+ elif self.content_type == CONTENT_JSON:
msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
- elif self.content_type == 'text/html; charset=utf-8':
+ elif self.content_type == CONTENT_HTML:
loglib.log().section('Execution error')
loglib.log().var_dump('Status', status)
loglib.log().var_dump('Message', msg)
""" Return the accepted languages.
"""
return self.get('accept-language')\
- or self.get_header('http_accept_language')\
+ or self.get_header('accept-language')\
or self.config().DEFAULT_LANGUAGE
"""
if self.get_bool('debug', False):
loglib.set_log_output('html')
- self.content_type = 'text/html; charset=utf-8'
+ self.content_type = CONTENT_HTML
return True
return False
self.raise_error("Parameter 'format' must be one of: " +
', '.join(formatting.list_formats(result_type)))
- self.content_type = CONTENT_TYPE.get(fmt, 'application/json')
+ self.content_type = CONTENT_TYPE.get(fmt, CONTENT_JSON)
return fmt
def parse_geometry_details(self, fmt: str) -> Dict[str, Any]:
- """ Create details strucutre from the supplied geometry parameters.
+ """ Create details structure from the supplied geometry parameters.
"""
numgeoms = 0
output = napi.GeometryFormat.NONE
numgeoms += 1
if numgeoms > self.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
- self.raise_error('Too many polgyon output options selected.')
+ self.raise_error('Too many polygon output options selected.')
return {'address_details': True,
'geometry_simplification': self.get_float('polygon_threshold', 0.0),
result = await api.details(place,
address_details=params.get_bool('addressdetails', False),
- linked_places=params.get_bool('linkedplaces', False),
+ linked_places=params.get_bool('linkedplaces', True),
parented_places=params.get_bool('hierarchy', False),
keywords=params.get_bool('keywords', False),
geometry_output = napi.GeometryFormat.GEOJSON
if params.get_bool('polygon_geojson', False)
- else napi.GeometryFormat.NONE
+ else napi.GeometryFormat.NONE,
+ locales=locales
)
if debug:
if result is None:
params.raise_error('No place with that OSM ID found.', status=404)
- result.localize(locales)
-
output = formatting.format_result(result, fmt,
{'locales': locales,
'group_hierarchy': params.get_bool('group_hierarchy', False),
'icon_base_url': params.config().MAPICON_URL})
- return params.build_response(output)
+ return params.build_response(output, num_results=1)
async def reverse_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
details = params.parse_geometry_details(fmt)
details['max_rank'] = helpers.zoom_to_rank(params.get_int('zoom', 18))
details['layers'] = params.get_layers()
+ details['locales'] = napi.Locales.from_accept_languages(params.get_accepted_languages())
result = await api.reverse(coord, **details)
if debug:
- return params.build_response(loglib.get_and_disable())
+ return params.build_response(loglib.get_and_disable(), num_results=1 if result else 0)
if fmt == 'xml':
queryparts = {'lat': str(coord.lat), 'lon': str(coord.lon), 'format': 'xml'}
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)}
- if result:
- result.localize(napi.Locales.from_accept_languages(params.get_accepted_languages()))
-
output = formatting.format_result(napi.ReverseResults([result] if result else []),
fmt, fmt_options)
- return params.build_response(output)
+ return params.build_response(output, num_results=1 if result else 0)
async def lookup_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
fmt = params.parse_format(napi.SearchResults, 'xml')
debug = params.setup_debugging()
details = params.parse_geometry_details(fmt)
+ details['locales'] = napi.Locales.from_accept_languages(params.get_accepted_languages())
places = []
for oid in (params.get('osm_ids') or '').split(','):
oid = oid.strip()
if len(oid) > 1 and oid[0] in 'RNWrnw' and oid[1:].isdigit():
- places.append(napi.OsmID(oid[0], int(oid[1:])))
+ places.append(napi.OsmID(oid[0].upper(), int(oid[1:])))
+
+ if len(places) > params.config().get_int('LOOKUP_MAX_COUNT'):
+ params.raise_error('Too many object IDs.')
if places:
results = await api.lookup(places, **details)
results = napi.SearchResults()
if debug:
- return params.build_response(loglib.get_and_disable())
+ return params.build_response(loglib.get_and_disable(), num_results=len(results))
fmt_options = {'extratags': params.get_bool('extratags', False),
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)}
- results.localize(napi.Locales.from_accept_languages(params.get_accepted_languages()))
-
output = formatting.format_result(results, fmt, fmt_options)
- return params.build_response(output)
+ return params.build_response(output, num_results=len(results))
async def _unstructured_search(query: str, api: napi.NominatimAPIAsync,
details['min_rank'], details['max_rank'] = \
helpers.feature_type_to_rank(params.get('featureType', ''))
+ if params.get('featureType', None) is not None:
+ details['layers'] = napi.DataLayer.ADDRESS
+ else:
+ details['layers'] = params.get_layers()
+
+ details['locales'] = napi.Locales.from_accept_languages(params.get_accepted_languages())
+ # unstructured query parameters
query = params.get('q', None)
+ # structured query parameters
queryparts = {}
+ for key in ('amenity', 'street', 'city', 'county', 'state', 'postalcode', 'country'):
+ details[key] = params.get(key, None)
+ if details[key]:
+ queryparts[key] = details[key]
+
try:
if query is not None:
+ if queryparts:
+ params.raise_error("Structured query parameters"
+ "(amenity, street, city, county, state, postalcode, country)"
+ " cannot be used together with 'q' parameter.")
queryparts['q'] = query
results = await _unstructured_search(query, api, details)
else:
- for key in ('amenity', 'street', 'city', 'county', 'state', 'postalcode', 'country'):
- details[key] = params.get(key, None)
- if details[key]:
- queryparts[key] = details[key]
query = ', '.join(queryparts.values())
results = await api.search_address(**details)
except UsageError as err:
params.raise_error(str(err))
- results.localize(napi.Locales.from_accept_languages(params.get_accepted_languages()))
-
if details['dedupe'] and len(results) > 1:
results = helpers.deduplicate_results(results, max_results)
if debug:
- return params.build_response(loglib.get_and_disable())
+ return params.build_response(loglib.get_and_disable(), num_results=len(results))
if fmt == 'xml':
helpers.extend_query_parts(queryparts, details,
(str(r.place_id) for r in results if r.place_id))
queryparts['format'] = fmt
- moreurl = urlencode(queryparts)
+ moreurl = params.base_uri() + '/search?' + urlencode(queryparts)
else:
moreurl = ''
output = formatting.format_result(results, fmt, fmt_options)
- return params.build_response(output)
+ return params.build_response(output, num_results=len(results))
+
+
+async def deletable_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
+ """ Server glue for /deletable endpoint.
+ This is a special endpoint that shows polygons that have been
+ deleted or are broken in the OSM data but are kept in the
+ Nominatim database to minimize disruption.
+ """
+ fmt = params.parse_format(RawDataList, 'json')
+
+ async with api.begin() as conn:
+ sql = sa.text(""" SELECT p.place_id, country_code,
+ name->'name' as name, i.*
+ FROM placex p, import_polygon_delete i
+ WHERE p.osm_id = i.osm_id AND p.osm_type = i.osm_type
+ AND p.class = i.class AND p.type = i.type
+ """)
+ results = RawDataList(r._asdict() for r in await conn.execute(sql))
+
+ return params.build_response(formatting.format_result(results, fmt, {}))
+
+
+async def polygons_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
+ """ Server glue for /polygons endpoint.
+ This is a special endpoint that shows polygons that have changed
+ their size but are kept in the Nominatim database with their
+ old area to minimize disruption.
+ """
+ fmt = params.parse_format(RawDataList, 'json')
+ sql_params: Dict[str, Any] = {
+ 'days': params.get_int('days', -1),
+ 'cls': params.get('class')
+ }
+ reduced = params.get_bool('reduced', False)
+
+ async with api.begin() as conn:
+ sql = sa.select(sa.text("""osm_type, osm_id, class, type,
+ name->'name' as name,
+ country_code, errormessage, updated"""))\
+ .select_from(sa.text('import_polygon_error'))
+ if sql_params['days'] > 0:
+ sql = sql.where(sa.text("updated > 'now'::timestamp - make_interval(days => :days)"))
+ if reduced:
+ sql = sql.where(sa.text("errormessage like 'Area reduced%'"))
+ if sql_params['cls'] is not None:
+ sql = sql.where(sa.text("class = :cls"))
+
+ sql = sql.order_by(sa.literal_column('updated').desc()).limit(1000)
+
+ results = RawDataList(r._asdict() for r in await conn.execute(sql, sql_params))
+
+ return params.build_response(formatting.format_result(results, fmt, {}))
EndpointFunc = Callable[[napi.NominatimAPIAsync, ASGIAdaptor], Any]
('details', details_endpoint),
('reverse', reverse_endpoint),
('lookup', lookup_endpoint),
- ('search', search_endpoint)
+ ('search', search_endpoint),
+ ('deletable', deletable_endpoint),
+ ('polygons', polygons_endpoint),
]