"""
log().section("Find in TIGER table")
t = conn.t.tiger
+ parent = conn.t.placex
sql = sa.select(t.c.place_id, t.c.parent_place_id,
+ parent.c.osm_type, parent.c.osm_id,
t.c.startnumber, t.c.endnumber, t.c.step,
t.c.postcode,
t.c.linegeo.ST_Centroid().label('centroid'),
_select_column_geometry(t.c.linegeo, details.geometry_output))
if isinstance(place, ntyp.PlaceID):
- sql = sql.where(t.c.place_id == place.place_id)
+ sql = sql.where(t.c.place_id == place.place_id)\
+ .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)
else:
return None
res = class_type(source_table=SourceTable.TIGER,
place_id=row.place_id,
+ osm_object=(row.osm_type, row.osm_id),
category=('place', 'houses' if hnr is None else 'house'),
postcode=row.postcode,
country_code='us',
""" Create a new AddressLine from the results of a datbase query.
"""
extratags: Dict[str, str] = getattr(row, 'extratags', {})
- if 'place_type' in row:
- extratags['place_type'] = row.place_type
+ if hasattr(row, 'place_type') and row.place_type:
+ extratags['place'] = row.place_type
names = row.name
if getattr(row, 'housenumber', None) is not None:
"""
Implementation of reverse geocoding.
"""
-from typing import Optional, List
+from typing import Optional, List, Callable, Type, Tuple
import sqlalchemy as sa
from geoalchemy2 import WKTElement
from nominatim.api.connection import SearchConnection
import nominatim.api.results as nres
from nominatim.api.logging import log
-from nominatim.api.types import AnyPoint, DataLayer, LookupDetails, GeometryFormat
+from nominatim.api.types import AnyPoint, DataLayer, LookupDetails, GeometryFormat, Bbox
# In SQLAlchemy expression which compare with NULL need to be expressed with
# the equal sign.
# pylint: disable=singleton-comparison
+RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
+
def _select_from_placex(t: SaFromClause, wkt: Optional[str] = None) -> SaSelect:
""" Create a select statement with the columns relevant for reverse
results.
t.c.housenumber, t.c.postcode, t.c.country_code,
t.c.importance, t.c.wikipedia,
t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
- t.c.centroid,
+ sa.case(
+ (t.c.geometry.ST_GeometryType().in_(('ST_LineString',
+ 'ST_MultiLineString')),
+ t.c.geometry.ST_ClosestPoint(wkt)),
+ else_=t.c.centroid).label('centroid'),
distance.label('distance'),
t.c.geometry.ST_Expand(0).label('bbox'))
sa.Integer).label('housenumber')
+def _interpolated_position(table: SaFromClause) -> SaLabel:
+ fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
+ rounded_pos = sa.func.round(table.c.position / fac) * fac
+ return sa.case(
+ (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
+ else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
+
+
def _is_address_point(table: SaFromClause) -> SaColumn:
return sa.and_(table.c.rank_address == 30,
sa.or_(table.c.housenumber != None,
sql = sa.select(inner.c.place_id, inner.c.osm_id,
inner.c.parent_place_id, inner.c.address,
_interpolated_housenumber(inner),
+ _interpolated_position(inner),
inner.c.postcode, inner.c.country_code,
- inner.c.linegeo.ST_LineInterpolatePoint(inner.c.position).label('centroid'),
inner.c.distance)
if self.details.geometry_output:
async def _find_tiger_number_for_street(self, parent_place_id: int,
+ parent_type: str, parent_id: int,
wkt: WKTElement) -> Optional[SaRow]:
t = self.conn.t.tiger
sql = sa.select(inner.c.place_id,
inner.c.parent_place_id,
+ sa.literal(parent_type).label('osm_type'),
+ sa.literal(parent_id).label('osm_id'),
_interpolated_housenumber(inner),
+ _interpolated_position(inner),
inner.c.postcode,
- inner.c.linegeo.ST_LineInterpolatePoint(inner.c.position).label('centroid'),
inner.c.distance)
if self.details.geometry_output:
return (await self.conn.execute(sql)).one_or_none()
- async def lookup_street_poi(self, wkt: WKTElement) -> Optional[nres.ReverseResult]:
+ async def lookup_street_poi(self,
+ wkt: WKTElement) -> Tuple[Optional[SaRow], RowFunc]:
""" Find a street or POI/address for the given WKT point.
"""
log().section('Reverse lookup on street/address level')
- result = None
distance = 0.006
parent_place_id = None
row = await self._find_closest_street_or_poi(wkt, distance)
+ row_func: RowFunc = nres.create_from_placex_row
log().var_dump('Result (street/building)', row)
# If the closest result was a street, but an address was requested,
if addr_row is not None:
row = addr_row
+ row_func = nres.create_from_placex_row
distance = addr_row.distance
elif row.country_code == 'us' and parent_place_id is not None:
log().comment('Find TIGER housenumber for street')
- addr_row = await self._find_tiger_number_for_street(parent_place_id, wkt)
+ addr_row = await self._find_tiger_number_for_street(parent_place_id,
+ row.osm_type,
+ row.osm_id,
+ wkt)
log().var_dump('Result (street Tiger housenumber)', addr_row)
if addr_row is not None:
- result = nres.create_from_tiger_row(addr_row, nres.ReverseResult)
+ row = addr_row
+ row_func = nres.create_from_tiger_row
else:
distance = row.distance
wkt, distance)
log().var_dump('Result (street interpolation)', addr_row)
if addr_row is not None:
- result = nres.create_from_osmline_row(addr_row, nres.ReverseResult)
+ row = addr_row
+ row_func = nres.create_from_osmline_row
- return result or nres.create_from_placex_row(row, nres.ReverseResult)
+ return row, row_func
async def _lookup_area_address(self, wkt: WKTElement) -> Optional[SaRow]:
return row
- async def lookup_area(self, wkt: WKTElement) -> Optional[nres.ReverseResult]:
+ async def lookup_area(self, wkt: WKTElement) -> Optional[SaRow]:
""" Lookup large areas for the given WKT point.
"""
log().section('Reverse lookup by larger area features')
else:
other_row = None
- return nres.create_from_placex_row(_get_closest(address_row, other_row), nres.ReverseResult)
+ return _get_closest(address_row, other_row)
- async def lookup_country(self, wkt: WKTElement) -> Optional[nres.ReverseResult]:
+ async def lookup_country(self, wkt: WKTElement) -> Optional[SaRow]:
""" Lookup the country for the given WKT point.
"""
log().section('Reverse lookup by country code')
address_row = (await self.conn.execute(sql)).one_or_none()
- return nres.create_from_placex_row(address_row, nres.ReverseResult)
+ return address_row
async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
wkt = WKTElement(f'POINT({coord[0]} {coord[1]})', srid=4326)
- result: Optional[nres.ReverseResult] = None
+ row: Optional[SaRow] = None
+ row_func: RowFunc = nres.create_from_placex_row
if self.max_rank >= 26:
- result = await self.lookup_street_poi(wkt)
- if result is None and self.max_rank > 4:
- result = await self.lookup_area(wkt)
- if result is None and self.layer_enabled(DataLayer.ADDRESS):
- result = await self.lookup_country(wkt)
+ row, tmp_row_func = await self.lookup_street_poi(wkt)
+ if row is not None:
+ row_func = tmp_row_func
+ if row is None and self.max_rank > 4:
+ row = await self.lookup_area(wkt)
+ if row is None and self.layer_enabled(DataLayer.ADDRESS):
+ row = await self.lookup_country(wkt)
+
+ result = row_func(row, nres.ReverseResult)
if result is not None:
+ assert row is not None
+ result.distance = row.distance
+ if hasattr(row, 'bbox'):
+ result.bbox = Bbox.from_wkb(row.bbox.data)
await nres.add_result_details(self.conn, result, self.details)
return result
"""
from typing import Tuple, Optional, Mapping
+import nominatim.api as napi
+
def get_label_tag(category: Tuple[str, str], extratags: Optional[Mapping[str, str]],
rank: int, country: Optional[str]) -> str:
""" Create a label tag for the given place that can be used as an XML name.
"""
- if rank < 26 and extratags and 'place'in extratags:
+ if rank < 26 and extratags and 'place' in extratags:
label = extratags['place']
+ elif rank < 26 and extratags and 'linked_place' in extratags:
+ label = extratags['linked_place']
elif category == ('boundary', 'administrative'):
label = ADMIN_LABELS.get((country or '', int(rank/2)))\
or ADMIN_LABELS.get(('', int(rank/2)))\
return label.lower().replace(' ', '_')
+def bbox_from_result(result: napi.ReverseResult) -> napi.Bbox:
+ """ Compute a bounding box for the result. For ways and relations
+ a given boundingbox is used. For all other object, a box is computed
+ around the centroid according to dimensions dereived from the
+ search rank.
+ """
+ if (result.osm_object and result.osm_object[0] == 'N') or result.bbox is None:
+ extent = NODE_EXTENT.get(result.category, 0.00005)
+ return napi.Bbox.from_point(result.centroid, extent)
+
+ return result.bbox
+
+
+# pylint: disable=line-too-long
+OSM_ATTRIBUTION = 'Data © OpenStreetMap contributors, ODbL 1.0. http://osm.org/copyright'
+
+
+OSM_TYPE_NAME = {
+ 'N': 'node',
+ 'W': 'way',
+ 'R': 'relation'
+}
+
+
ADMIN_LABELS = {
('', 1): 'Continent',
('', 2): 'Country',
('amenity', 'prison'): 'amenity_prison',
('highway', 'bus_stop'): 'transport_bus_stop2'
}
+
+NODE_EXTENT = {
+ ('place', 'continent'): 25,
+ ('place', 'country'): 7,
+ ('place', 'state'): 2.6,
+ ('place', 'province'): 2.6,
+ ('place', 'region'): 1.0,
+ ('place', 'county'): 0.7,
+ ('place', 'city'): 0.16,
+ ('place', 'municipality'): 0.16,
+ ('place', 'island'): 0.32,
+ ('place', 'postcode'): 0.16,
+ ('place', 'town'): 0.04,
+ ('place', 'village'): 0.02,
+ ('place', 'hamlet'): 0.02,
+ ('place', 'district'): 0.02,
+ ('place', 'borough'): 0.02,
+ ('place', 'suburb'): 0.02,
+ ('place', 'locality'): 0.01,
+ ('place', 'neighbourhood'): 0.01,
+ ('place', 'quarter'): 0.01,
+ ('place', 'city_block'): 0.01,
+ ('landuse', 'farm'): 0.01,
+ ('place', 'farm'): 0.01,
+ ('place', 'airport'): 0.015,
+ ('aeroway', 'aerodrome'): 0.015,
+ ('railway', 'station'): 0.005
+}
+++ /dev/null
-# SPDX-License-Identifier: GPL-3.0-or-later
-#
-# This file is part of Nominatim. (https://nominatim.org)
-#
-# Copyright (C) 2023 by the Nominatim developer community.
-# For a full list of authors see the git log.
-"""
-Constants shared by all formats.
-"""
-
-import nominatim.api as napi
-
-# pylint: disable=line-too-long
-OSM_ATTRIBUTION = 'Data © OpenStreetMap contributors, ODbL 1.0. http://www.openstreetmap.org/copyright'
-
-OSM_TYPE_NAME = {
- 'N': 'node',
- 'W': 'way',
- 'R': 'relation'
-}
-
-NODE_EXTENT = [25, 25, 25, 25,
- 7,
- 2.6, 2.6, 2.0, 1.0, 1.0,
- 0.7, 0.7, 0.7,
- 0.16, 0.16, 0.16, 0.16,
- 0.04, 0.04,
- 0.02, 0.02,
- 0.01, 0.01, 0.01, 0.01, 0.01,
- 0.015, 0.015, 0.015, 0.015,
- 0.005]
-
-
-def bbox_from_result(result: napi.ReverseResult) -> napi.Bbox:
- """ Compute a bounding box for the result. For ways and relations
- a given boundingbox is used. For all other object, a box is computed
- around the centroid according to dimensions dereived from the
- search rank.
- """
- if (result.osm_object and result.osm_object[0] == 'N') or result.bbox is None:
- return napi.Bbox.from_point(result.centroid, NODE_EXTENT[result.rank_search])
-
- return result.bbox
from typing import Mapping, Any, Optional, Tuple
import nominatim.api as napi
-from nominatim.api.v1.constants import OSM_ATTRIBUTION, OSM_TYPE_NAME, bbox_from_result
-from nominatim.api.v1.classtypes import ICONS, get_label_tag
+import nominatim.api.v1.classtypes as cl
from nominatim.utils.json_writer import JsonWriter
def _write_osm_id(out: JsonWriter, osm_object: Optional[Tuple[str, int]]) -> None:
if osm_object is not None:
- out.keyval_not_none('osm_type', OSM_TYPE_NAME.get(osm_object[0], None))\
+ out.keyval_not_none('osm_type', cl.OSM_TYPE_NAME.get(osm_object[0], None))\
.keyval('osm_id', osm_object[1])
country_code: Optional[str]) -> None:
parts = {}
for line in (address or []):
- if line.isaddress and line.local_name:
- label = get_label_tag(line.category, line.extratags,
- line.rank_address, country_code)
- if label not in parts:
- parts[label] = line.local_name
+ if line.isaddress:
+ if line.local_name:
+ label = cl.get_label_tag(line.category, line.extratags,
+ line.rank_address, country_code)
+ if label not in parts:
+ print(label)
+ parts[label] = line.local_name
+ if line.names and 'ISO3166-2' in line.names and line.admin_level:
+ parts[f"ISO3166-2-lvl{line.admin_level}"] = line.names['ISO3166-2']
for k, v in parts.items():
out.keyval(k, v)
out.start_object()\
.keyval_not_none('place_id', result.place_id)\
- .keyval('licence', OSM_ATTRIBUTION)\
+ .keyval('licence', cl.OSM_ATTRIBUTION)\
_write_osm_id(out, result.osm_object)
.keyval('type', result.category[1])\
.keyval('place_rank', result.rank_search)\
.keyval('importance', result.calculated_importance())\
- .keyval('addresstype', get_label_tag(result.category, result.extratags,
- result.rank_address,
- result.country_code))\
+ .keyval('addresstype', cl.get_label_tag(result.category, result.extratags,
+ result.rank_address,
+ result.country_code))\
.keyval('name', locales.display_name(result.names))\
.keyval('display_name', ', '.join(label_parts))
if options.get('icon_base_url', None):
- icon = ICONS.get(result.category)
+ icon = cl.ICONS.get(result.category)
if icon:
out.keyval('icon', f"{options['icon_base_url']}/{icon}.p.20.png")
if options.get('namedetails', False):
out.keyval('namedetails', result.names)
- bbox = bbox_from_result(result)
+ bbox = cl.bbox_from_result(result)
out.key('boundingbox').start_array()\
- .value(bbox.minlat).next()\
- .value(bbox.maxlat).next()\
- .value(bbox.minlon).next()\
- .value(bbox.maxlon).next()\
+ .value(f"{bbox.minlat:0.7f}").next()\
+ .value(f"{bbox.maxlat:0.7f}").next()\
+ .value(f"{bbox.minlon:0.7f}").next()\
+ .value(f"{bbox.maxlon:0.7f}").next()\
.end_array().next()
if result.geometry:
out.start_object()\
.keyval('type', 'FeatureCollection')\
- .keyval('licence', OSM_ATTRIBUTION)\
+ .keyval('licence', cl.OSM_ATTRIBUTION)\
.key('features').start_array()
for result in results:
.keyval('category', result.category[0])\
.keyval('type', result.category[1])\
.keyval('importance', result.calculated_importance())\
- .keyval('addresstype', get_label_tag(result.category, result.extratags,
- result.rank_address,
- result.country_code))\
+ .keyval('addresstype', cl.get_label_tag(result.category, result.extratags,
+ result.rank_address,
+ result.country_code))\
.keyval('name', locales.display_name(result.names))\
.keyval('display_name', ', '.join(label_parts))
out.end_object().next() # properties
- bbox = bbox_from_result(result)
- out.keyval('bbox', bbox.coords)
+ out.key('bbox').start_array()
+ for coord in cl.bbox_from_result(result).coords:
+ out.float(coord, 7).next()
+ out.end_array().next()
out.key('geometry').raw(result.geometry.get('geojson')
or result.centroid.to_geojson()).next()
.keyval('type', 'FeatureCollection')\
.key('geocoding').start_object()\
.keyval('version', '0.1.0')\
- .keyval('attribution', OSM_ATTRIBUTION)\
+ .keyval('attribution', cl.OSM_ATTRIBUTION)\
.keyval('licence', 'ODbL')\
.keyval_not_none('query', options.get('query'))\
.end_object().next()\
out.keyval('osm_key', result.category[0])\
.keyval('osm_value', result.category[1])\
.keyval('type', GEOCODEJSON_RANKS[max(3, min(28, result.rank_address))])\
- .keyval_not_none('accuracy', result.distance)\
+ .keyval_not_none('accuracy', result.distance, transform=int)\
.keyval('label', ', '.join(label_parts))\
- .keyval_not_none('name', locales.display_name(result.names))\
+ .keyval_not_none('name', result.names, transform=locales.display_name)\
if options.get('addressdetails', False):
_write_geocodejson_address(out, result.address_rows, result.place_id,
import xml.etree.ElementTree as ET
import nominatim.api as napi
-from nominatim.api.v1.constants import OSM_ATTRIBUTION, OSM_TYPE_NAME, bbox_from_result
-from nominatim.api.v1.classtypes import ICONS, get_label_tag
+import nominatim.api.v1.classtypes as cl
def _write_xml_address(root: ET.Element, address: napi.AddressLines,
country_code: Optional[str]) -> None:
parts = {}
for line in address:
- if line.isaddress and line.local_name:
- label = get_label_tag(line.category, line.extratags,
- line.rank_address, country_code)
- if label not in parts:
- parts[label] = line.local_name
+ if line.isaddress:
+ if line.local_name:
+ label = cl.get_label_tag(line.category, line.extratags,
+ line.rank_address, country_code)
+ if label not in parts:
+ parts[label] = line.local_name
+ if line.names and 'ISO3166-2' in line.names and line.admin_level:
+ parts[f"ISO3166-2-lvl{line.admin_level}"] = line.names['ISO3166-2']
for k,v in parts.items():
ET.SubElement(root, k).text = v
if result.place_id is not None:
place.set('place_id', str(result.place_id))
if result.osm_object:
- osm_type = OSM_TYPE_NAME.get(result.osm_object[0], None)
+ osm_type = cl.OSM_TYPE_NAME.get(result.osm_object[0], None)
if osm_type is not None:
place.set('osm_type', osm_type)
place.set('osm_id', str(result.osm_object[1]))
if result.names and 'ref' in result.names:
- place.set('place_id', result.names['ref'])
- place.set('lat', str(result.centroid.lat))
- place.set('lon', str(result.centroid.lon))
+ place.set('ref', result.names['ref'])
+ elif label_parts:
+ # bug reproduced from PHP
+ place.set('ref', label_parts[0])
+ place.set('lat', f"{result.centroid.lat:.7f}")
+ place.set('lon', f"{result.centroid.lon:.7f}")
- bbox = bbox_from_result(result)
- place.set('boundingbox', ','.join(map(str, [bbox.minlat, bbox.maxlat,
- bbox.minlon, bbox.maxlon])))
+ bbox = cl.bbox_from_result(result)
+ place.set('boundingbox',
+ f"{bbox.minlat:.7f},{bbox.maxlat:.7f},{bbox.minlon:.7f},{bbox.maxlon:.7f}")
place.set('place_rank', str(result.rank_search))
place.set('address_rank', str(result.rank_address))
root = ET.Element(xml_root_tag)
root.set('timestamp', dt.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S +00:00'))
- root.set('attribution', OSM_ATTRIBUTION)
+ root.set('attribution', cl.OSM_ATTRIBUTION)
for k, v in xml_extra_info.items():
root.set(k, v)
place = _create_base_entry(result, root, simple, locales)
if not simple and options.get('icon_base_url', None):
- icon = ICONS.get(result.category)
+ icon = cl.ICONS.get(result.category)
if icon:
place.set('icon', icon)
Generic part of the server implementation of the v1 API.
Combine with the scaffolding provided for the various Python ASGI frameworks.
"""
-from typing import Optional, Any, Type, Callable, NoReturn, TypeVar
+from typing import Optional, Any, Type, Callable, NoReturn, cast
+from functools import reduce
import abc
+import math
from nominatim.config import Configuration
import nominatim.api as napi
'debug': 'text/html; charset=utf-8'
}
-ConvT = TypeVar('ConvT', int, float)
-
class ASGIAdaptor(abc.ABC):
""" Adapter class for the different ASGI frameworks.
Wraps functionality over concrete requests and responses.
raise self.error(msg, status)
- def _get_typed(self, name: str, dest_type: Type[ConvT], type_name: str,
- default: Optional[ConvT] = None) -> ConvT:
- """ Return an input parameter as the type 'dest_type'. Raises an
- exception if the parameter is given but not in the given format.
+ def get_int(self, name: str, default: Optional[int] = None) -> int:
+ """ Return an input parameter as an int. Raises an exception if
+ the parameter is given but not in an integer format.
If 'default' is given, then it will be returned when the parameter
is missing completely. When 'default' is None, an error will be
self.raise_error(f"Parameter '{name}' missing.")
try:
- intval = dest_type(value)
+ intval = int(value)
except ValueError:
- self.raise_error(f"Parameter '{name}' must be a {type_name}.")
+ self.raise_error(f"Parameter '{name}' must be a number.")
return intval
- def get_int(self, name: str, default: Optional[int] = None) -> int:
- """ Return an input parameter as an int. Raises an exception if
- the parameter is given but not in an integer format.
+ def get_float(self, name: str, default: Optional[float] = None) -> float:
+ """ Return an input parameter as a flaoting-point number. Raises an
+ exception if the parameter is given but not in an float format.
If 'default' is given, then it will be returned when the parameter
is missing completely. When 'default' is None, an error will be
raised on a missing parameter.
"""
- return self._get_typed(name, int, 'number', default)
+ value = self.get(name)
+ if value is None:
+ if default is not None:
+ return default
- def get_float(self, name: str, default: Optional[float] = None) -> int:
- """ Return an input parameter as a flaoting-point number. Raises an
- exception if the parameter is given but not in an float format.
+ self.raise_error(f"Parameter '{name}' missing.")
- If 'default' is given, then it will be returned when the parameter
- is missing completely. When 'default' is None, an error will be
- raised on a missing parameter.
- """
- return self._get_typed(name, float, 'number', default)
+ try:
+ fval = float(value)
+ except ValueError:
+ self.raise_error(f"Parameter '{name}' must be a number.")
+
+ if math.isnan(fval) or math.isinf(fval):
+ self.raise_error(f"Parameter '{name}' must be a number.")
+
+ return fval
def get_bool(self, name: str, default: Optional[bool] = None) -> bool:
return False
- def get_layers(self) -> napi.DataLayer:
+ def get_layers(self) -> Optional[napi.DataLayer]:
""" Return a parsed version of the layer parameter.
"""
param = self.get('layer', None)
if param is None:
return None
- return reduce(napi.DataLayer.__or__,
- (getattr(napi.DataLayer, s.upper()) for s in param.split(',')))
+ return cast(napi.DataLayer,
+ reduce(napi.DataLayer.__or__,
+ (getattr(napi.DataLayer, s.upper()) for s in param.split(','))))
def parse_format(self, result_type: Type[Any], default: str) -> str:
zoom = max(0, min(18, params.get_int('zoom', 18)))
- # Negation makes sure that NaN is handled. Don't change.
- if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
- params.raise_error('Invalid coordinates.')
-
details = napi.LookupDetails(address_details=True,
geometry_simplification=params.get_float('polygon_threshold', 0.0))
numgeoms = 0
numgeoms += 1
if numgeoms > params.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
- params.raise_error(f'Too many polgyon output options selected.')
+ params.raise_error('Too many polgyon output options selected.')
result = await api.reverse(coord, REVERSE_MAX_RANKS[zoom],
params.get_layers() or
from typing import Optional, Mapping, cast, Any
from pathlib import Path
-import falcon
from falcon.asgi import App, Request, Response
from nominatim.api import NominatimAPIAsync
self.content_type = content_type
-async def nominatim_error_handler(req: Request, resp: Response,
+async def nominatim_error_handler(req: Request, resp: Response, #pylint: disable=unused-argument
exception: HTTPNominatimError,
_: Any) -> None:
+ """ Special error handler that passes message and content type as
+ per exception info.
+ """
resp.status = exception.status
resp.text = exception.msg
resp.content_type = exception.content_type
def error(self, msg: str, status: int = 400) -> SanicException:
exception = SanicException(msg, status_code=status)
- exception.headers = {'content-type': self.content_type}
return exception
return self.raw(json.dumps(value, ensure_ascii=False))
+ def float(self, value: float, precision: int) -> 'JsonWriter':
+ """ Write out a float value with the given precision.
+ """
+ return self.raw(f"{value:0.{precision}f}")
+
def next(self) -> 'JsonWriter':
""" Write out a delimiter comma between JSON object or array elements.
"""
| Point | [9.5036065, 47.0660892] |
And results contain in field __geocoding
| version | licence | attribution |
- | 0.1.0 | ODbL | Data © OpenStreetMap contributors, ODbL 1.0. https://osm.org/copyright |
+ | 0.1.0 | ODbL | ^Data © OpenStreetMap contributors, ODbL 1.0. https?://osm.org/copyright$ |
Examples:
| has_address | attributes |
| way | 1 | 30 | place | house | place |
And results contain
| boundingbox |
- | [47.118495392, 47.118595392, 9.57049676, 9.57059676] |
+ | ^\[47.118495\d*, 47.118595\d*, 9.570496\d*, 9.570596\d*\] |
And results contain
| display_name |
| 1019, Grosssteg, Sücka, Triesenberg, Oberland, 9497, Liechtenstein |
| 1 | attributes |
| 0 | not attributes |
- Scenario Outline: Siple OSM result
+ Scenario Outline: Simple OSM result
When sending v1/reverse at 47.066,9.504 with format <format>
Then result has attributes place_id
And results contain
| licence |
- | Data © OpenStreetMap contributors, ODbL 1.0. https://osm.org/copyright |
+ | ^Data © OpenStreetMap contributors, ODbL 1.0. https?://osm.org/copyright$ |
And results contain
| osm_type | osm_id |
| node | 6522627624 |
| way | 1 |
And results contain
| centroid | boundingbox |
- | 9.57054676 47.118545392 | ['47.118495392', '47.118595392', '9.57049676', '9.57059676'] |
+ | 9.57054676 47.118545392 | ^\['47.118495\d*', '47.118595\d*', '9.570496\d*', '9.570596\d*'\] |
And results contain
| display_name |
| 1019, Grosssteg, Sücka, Triesenberg, Oberland, 9497, Liechtenstein |
| way | 396009653 | 30 | 30 |
And results contain
| centroid | boundingbox |
- | -86.4808553258 32.4753580256 | ^32.475308025\d*,32.475408025\d*,-86.480905325\d*,-86.480805325\d* |
+ | -86.4808553 32.4753580 | ^32.4753080\d*,32.4754080\d*,-86.4809053\d*,-86.4808053\d* |
And results contain
| display_name |
| 707, Upper Kingston Road, Upper Kingston, Prattville, Autauga County, 36067, United States |
| way | 1 | 30 | 30 |
And results contain
| centroid | boundingbox |
- | 9.57054676 47.118545392 | 47.118495392,47.118595392,9.57049676,9.57059676 |
+ | 9.57054676 47.118545392 | ^47.118495\d*,47.118595\d*,9.570496\d*,9.570596\d* |
And results contain
| display_name |
| 1019, Grosssteg, Sücka, Triesenberg, Oberland, 9497, Liechtenstein |
""" Generic comparator for fields, which looks at the type of the
value compared.
"""
- def __init__(self, value):
+ def __init__(self, value, **extra_args):
self.value = value
+ self.extra_args = extra_args
def __eq__(self, other):
if isinstance(self.value, float):
- return math.isclose(self.value, float(other))
+ return math.isclose(self.value, float(other), **self.extra_args)
if self.value.startswith('^'):
- return re.fullmatch(self.value, other)
+ return re.fullmatch(self.value, str(other))
if isinstance(other, dict):
return other == eval('{' + self.value + '}')
lon, lat = context.osm.grid_node(int(value))
else:
raise RuntimeError("Context needed when using grid coordinates")
- self.check_row_field(i, 'lat', Field(float(lat)), base=subdict)
- self.check_row_field(i, 'lon', Field(float(lon)), base=subdict)
+ self.check_row_field(i, 'lat', Field(float(lat), abs_tol=1e-07), base=subdict)
+ self.check_row_field(i, 'lon', Field(float(lon), abs_tol=1e-07), base=subdict)
else:
self.check_row_field(i, name, Field(value), base=subdict)
@then(u'a HTTP (?P<status>\d+) is returned')
def check_http_return_status(context, status):
assert context.response.errorcode == int(status), \
- f"Return HTTP status is {context.response.errorcode}."
+ f"Return HTTP status is {context.response.errorcode}."\
+ f" Full response:\n{context.response.page}"
@then(u'the page contents equals "(?P<text>.+)"')
def check_page_content_equals(context, text):
startnumber=1, endnumber=4, step=1,
postcode='34425',
geometry='LINESTRING(23 34, 23 35)')
+ apiobj.add_placex(place_id=12,
+ category=('highway', 'residential'),
+ osm_type='W', osm_id=6601223,
+ geometry='LINESTRING(23 34, 23 35)')
result = apiobj.api.lookup(napi.PlaceID(4924), napi.LookupDetails())
assert result.place_id == 4924
assert result.parent_place_id == 12
assert result.linked_place_id is None
- assert result.osm_object is None
+ assert result.osm_object == ('W', 6601223)
assert result.admin_level == 15
assert result.names is None
@pytest.mark.parametrize('func', (nresults.create_from_osmline_row,
nresults.create_from_tiger_row))
def test_create_row_with_housenumber(func):
- row = FakeRow(place_id = 2345, osm_id = 111, housenumber = 4,
- address = None, postcode = '99900', country_code = 'xd',
- centroid = FakeCentroid(0, 0))
+ row = FakeRow(place_id=2345, osm_type='W', osm_id=111, housenumber=4,
+ address=None, postcode='99900', country_code='xd',
+ centroid=FakeCentroid(0, 0))
res = func(row, DetailedResult)
@pytest.mark.parametrize('func', (nresults.create_from_osmline_row,
nresults.create_from_tiger_row))
def test_create_row_without_housenumber(func):
- row = FakeRow(place_id=2345, osm_id=111,
+ row = FakeRow(place_id=2345, osm_type='W', osm_id=111,
startnumber=1, endnumber=11, step=2,
address=None, postcode='99900', country_code='xd',
centroid=FakeCentroid(0, 0))