import sys
import sqlalchemy as sa
+from sqlalchemy.ext.compiler import compiles
from sqlalchemy import types
from nominatim.typing import SaColumn, SaBind
#pylint: disable=all
+SQLITE_FUNCTION_ALIAS = (
+ ('ST_AsEWKB', sa.Text, 'AsEWKB'),
+ ('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
+ ('ST_AsKML', sa.Text, 'AsKML'),
+ ('ST_AsSVG', sa.Text, 'AsSVG'),
+)
+
+def _add_function_alias(func: str, ftype: type, alias: str) -> None:
+ _FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
+ "type": ftype,
+ "name": func,
+ "identifier": func,
+ "inherit_cache": True})
+
+ func_templ = f"{alias}(%s)"
+
+ def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
+ return func_templ % compiler.process(element.clauses, **kw)
+
+ compiles(_FuncDef, 'sqlite')(_sqlite_impl) # type: ignore[no-untyped-call]
+
+for alias in SQLITE_FUNCTION_ALIAS:
+ _add_function_alias(*alias)
+
+
class Geometry(types.UserDefinedType): # type: ignore[type-arg]
""" Simplified type decorator for PostGIS geometry. This type
only supports geometries in 4326 projection.
def get_col_spec(self) -> str:
- return f'GEOMETRY'
+ return f'GEOMETRY({self.subtype}, 4326)'
def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:
return process
+ def column_expression(self, col: SaColumn) -> SaColumn:
+ return sa.func.ST_AsEWKB(col)
+
+
def bind_expression(self, bindvalue: SaBind) -> SaColumn:
return sa.func.ST_GeomFromText(bindvalue, sa.text('4326'), type_=self)
def ST_LineLocatePoint(self, other: SaColumn) -> SaColumn:
return sa.func.ST_LineLocatePoint(self, other, type_=sa.Float)
+
+
+@compiles(Geometry, 'sqlite') # type: ignore[no-untyped-call]
+def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]
+ return 'GEOMETRY'
import nominatim.api as napi
from nominatim.db.sql_preprocessor import SQLPreprocessor
+from nominatim.tools import convert_sqlite
import nominatim.api.logging as loglib
class APITester:
testapi.async_to_sync(testapi.create_tables())
proc = SQLPreprocessor(temp_db_conn, testapi.api.config)
- proc.run_sql_file(temp_db_conn, 'functions/address_lookup.sql')
proc.run_sql_file(temp_db_conn, 'functions/ranking.sql')
loglib.set_log_output('text')
print(loglib.get_and_disable())
testapi.api.close()
+
+
+@pytest.fixture(params=['postgres_db', 'sqlite_db'])
+def frontend(request, event_loop, tmp_path):
+ if request.param == 'sqlite_db':
+ db = str(tmp_path / 'test_nominatim_python_unittest.sqlite')
+
+ def mkapi(apiobj, options={'reverse'}):
+ event_loop.run_until_complete(convert_sqlite.convert(Path('/invalid'),
+ db, options))
+ return napi.NominatimAPI(Path('/invalid'),
+ {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={db}",
+ 'NOMINATIM_USE_US_TIGER_DATA': 'yes'})
+ elif request.param == 'postgres_db':
+ def mkapi(apiobj, options=None):
+ return apiobj.api
+
+ return mkapi
"""
Tests for lookup API call.
"""
+import json
+
import pytest
import nominatim.api as napi
-def test_lookup_empty_list(apiobj):
- assert apiobj.api.lookup([]) == []
+def test_lookup_empty_list(apiobj, frontend):
+ api = frontend(apiobj, options={'details'})
+ assert api.lookup([]) == []
-def test_lookup_non_existing(apiobj):
- assert apiobj.api.lookup((napi.PlaceID(332), napi.OsmID('W', 4),
- napi.OsmID('W', 4, 'highway'))) == []
+def test_lookup_non_existing(apiobj, frontend):
+ api = frontend(apiobj, options={'details'})
+ assert api.lookup((napi.PlaceID(332), napi.OsmID('W', 4),
+ napi.OsmID('W', 4, 'highway'))) == []
@pytest.mark.parametrize('idobj', (napi.PlaceID(332), napi.OsmID('W', 4),
napi.OsmID('W', 4, 'highway')))
-def test_lookup_single_placex(apiobj, idobj):
+def test_lookup_single_placex(apiobj, frontend, idobj):
apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
class_='highway', type='residential',
name={'name': 'Road'}, address={'city': 'Barrow'},
centroid=(23, 34),
geometry='LINESTRING(23 34, 23.1 34, 23.1 34.1, 23 34)')
- result = apiobj.api.lookup([idobj])
+ api = frontend(apiobj, options={'details'})
+ result = api.lookup([idobj])
assert len(result) == 1
assert result.geometry == {}
-def test_lookup_multiple_places(apiobj):
+def test_lookup_multiple_places(apiobj, frontend):
apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
class_='highway', type='residential',
name={'name': 'Road'}, address={'city': 'Barrow'},
geometry='LINESTRING(23 34, 23 35)')
- result = apiobj.api.lookup((napi.OsmID('W', 1),
- napi.OsmID('W', 4),
- napi.OsmID('W', 9928)))
+ api = frontend(apiobj, options={'details'})
+ result = api.lookup((napi.OsmID('W', 1),
+ napi.OsmID('W', 4),
+ napi.OsmID('W', 9928)))
assert len(result) == 2
@pytest.mark.parametrize('gtype', list(napi.GeometryFormat))
-def test_simple_place_with_geometry(apiobj, gtype):
+def test_simple_place_with_geometry(apiobj, frontend, gtype):
apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
class_='highway', type='residential',
name={'name': 'Road'}, address={'city': 'Barrow'},
centroid=(23, 34),
geometry='POLYGON((23 34, 23.1 34, 23.1 34.1, 23 34))')
- result = apiobj.api.lookup([napi.OsmID('W', 4)],
- geometry_output=gtype)
+ api = frontend(apiobj, options={'details'})
+ result = api.lookup([napi.OsmID('W', 4)], geometry_output=gtype)
assert len(result) == 1
assert result[0].place_id == 332
assert list(result[0].geometry.keys()) == [gtype.name.lower()]
-def test_simple_place_with_geometry_simplified(apiobj):
+def test_simple_place_with_geometry_simplified(apiobj, frontend):
apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
class_='highway', type='residential',
name={'name': 'Road'}, address={'city': 'Barrow'},
centroid=(23, 34),
geometry='POLYGON((23 34, 22.999 34, 23.1 34, 23.1 34.1, 23 34))')
- result = apiobj.api.lookup([napi.OsmID('W', 4)],
- geometry_output=napi.GeometryFormat.TEXT,
- geometry_simplification=0.1)
+ api = frontend(apiobj, options={'details'})
+ result = api.lookup([napi.OsmID('W', 4)],
+ geometry_output=napi.GeometryFormat.GEOJSON,
+ geometry_simplification=0.1)
assert len(result) == 1
assert result[0].place_id == 332
- assert result[0].geometry == {'text': 'POLYGON((23 34,23.1 34,23.1 34.1,23 34))'}
+ geom = json.loads(result[0].geometry['geojson'])
+
+ assert geom['type'] == 'Polygon'
+ assert geom['coordinates'] == [[[23, 34], [23.1, 34], [23.1, 34.1], [23, 34]]]