from nominatim.errors import UsageError
from nominatim.db.sqlalchemy_schema import SearchTables
from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
+import nominatim.db.sqlite_functions
from nominatim.config import Configuration
from nominatim.api.connection import SearchConnection
from nominatim.api.status import get_status, StatusResult
@sa.event.listens_for(engine.sync_engine, "connect")
def _on_sqlite_connect(dbapi_con: Any, _: Any) -> None:
dbapi_con.run_async(lambda conn: conn.enable_load_extension(True))
+ nominatim.db.sqlite_functions.install_custom_functions(dbapi_con)
cursor = dbapi_con.cursor()
cursor.execute("SELECT load_extension('mod_spatialite')")
cursor.execute('SELECT SetDecimalPrecision(7)')
inherit_cache = True
def __init__(self, table: SaFromClause, column: str, tokens: List[int]) -> None:
- super().__init__(getattr(table.c, column),
+ super().__init__(table.c.place_id, getattr(table.c, column), column,
sa.type_coerce(tokens, IntArray))
@compiles(LookupAll) # type: ignore[no-untyped-call, misc]
def _default_lookup_all(element: LookupAll,
compiler: 'sa.Compiled', **kw: Any) -> str:
- col, tokens = list(element.clauses)
+ _, col, _, tokens = list(element.clauses)
return "(%s @> %s)" % (compiler.process(col, **kw),
compiler.process(tokens, **kw))
+@compiles(LookupAll, 'sqlite') # type: ignore[no-untyped-call, misc]
+def _sqlite_lookup_all(element: LookupAll,
+ compiler: 'sa.Compiled', **kw: Any) -> str:
+ place, col, colname, tokens = list(element.clauses)
+ return "(%s IN (SELECT CAST(value as bigint) FROM"\
+ " (SELECT array_intersect_fuzzy(places) as p FROM"\
+ " (SELECT places FROM reverse_search_name"\
+ " WHERE word IN (SELECT value FROM json_each('[' || %s || ']'))"\
+ " AND column = %s"\
+ " ORDER BY length(places)) as x) as u,"\
+ " json_each('[' || u.p || ']'))"\
+ " AND array_contains(%s, %s))"\
+ % (compiler.process(place, **kw),
+ compiler.process(tokens, **kw),
+ compiler.process(colname, **kw),
+ compiler.process(col, **kw),
+ compiler.process(tokens, **kw)
+ )
+
+
class LookupAny(LookupType):
""" Find all entries that contain at least one of the given tokens.
inherit_cache = True
def __init__(self, table: SaFromClause, column: str, tokens: List[int]) -> None:
- super().__init__(getattr(table.c, column),
+ super().__init__(table.c.place_id, getattr(table.c, column), column,
sa.type_coerce(tokens, IntArray))
-
@compiles(LookupAny) # type: ignore[no-untyped-call, misc]
def _default_lookup_any(element: LookupAny,
compiler: 'sa.Compiled', **kw: Any) -> str:
- col, tokens = list(element.clauses)
+ _, col, _, tokens = list(element.clauses)
return "(%s && %s)" % (compiler.process(col, **kw),
compiler.process(tokens, **kw))
+@compiles(LookupAny, 'sqlite') # type: ignore[no-untyped-call, misc]
+def _sqlite_lookup_any(element: LookupAny,
+ compiler: 'sa.Compiled', **kw: Any) -> str:
+ place, _, colname, tokens = list(element.clauses)
+ return "%s IN (SELECT CAST(value as bigint) FROM"\
+ " (SELECT array_union(places) as p FROM reverse_search_name"\
+ " WHERE word IN (SELECT value FROM json_each('[' || %s || ']'))"\
+ " AND column = %s) as u,"\
+ " json_each('[' || u.p || ']'))" % (compiler.process(place, **kw),
+ compiler.process(tokens, **kw),
+ compiler.process(colname, **kw))
+
class Restrict(LookupType):
arg1, arg2 = list(element.clauses)
return "(coalesce(null, %s) @> %s)" % (compiler.process(arg1, **kw),
compiler.process(arg2, **kw))
+
+@compiles(Restrict, 'sqlite') # type: ignore[no-untyped-call, misc]
+def _sqlite_restrict(element: Restrict,
+ compiler: 'sa.Compiled', **kw: Any) -> str:
+ return "array_contains(%s)" % compiler.process(element.clauses, **kw)
import abc
import sqlalchemy as sa
-from sqlalchemy.dialects.postgresql import array_agg
from nominatim.typing import SaFromClause, SaScalarSelect, SaColumn, \
SaExpression, SaSelect, SaLambdaSelect, SaRow, SaBind
from nominatim.api.types import SearchDetails, DataLayer, GeometryFormat, Bbox
import nominatim.api.results as nres
from nominatim.api.search.db_search_fields import SearchData, WeightedCategories
-from nominatim.db.sqlalchemy_types import Geometry
+from nominatim.db.sqlalchemy_types import Geometry, IntArray
#pylint: disable=singleton-comparison,not-callable
#pylint: disable=too-many-branches,too-many-arguments,too-many-locals,too-many-statements
def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause,
numerals: List[int], details: SearchDetails) -> SaScalarSelect:
- all_ids = array_agg(table.c.place_id) # type: ignore[no-untyped-call]
+ all_ids = sa.func.ArrayAgg(table.c.place_id)
sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id)
if len(numerals) == 1:
orexpr.append(no_index(table.c.rank_address).between(1, 30))
elif layers & DataLayer.ADDRESS:
orexpr.append(no_index(table.c.rank_address).between(1, 29))
- orexpr.append(sa.and_(no_index(table.c.rank_address) == 30,
- sa.or_(table.c.housenumber != None,
- table.c.address.has_key('addr:housename'))))
+ orexpr.append(sa.func.IsAddressPoint(table))
elif layers & DataLayer.POI:
orexpr.append(sa.and_(no_index(table.c.rank_address) == 30,
table.c.class_.not_in(('place', 'building'))))
yield result
+def _int_list_to_subquery(inp: List[int]) -> 'sa.Subquery':
+ """ Create a subselect that returns the given list of integers
+ as rows in the column 'nr'.
+ """
+ vtab = sa.func.JsonArrayEach(sa.type_coerce(inp, sa.JSON))\
+ .table_valued(sa.column('value', type_=sa.JSON)) # type: ignore[no-untyped-call]
+ return sa.select(sa.cast(sa.cast(vtab.c.value, sa.Text), sa.Integer).label('nr')).subquery()
+
+
async def _get_osmline(conn: SearchConnection, place_ids: List[int],
numerals: List[int],
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
t = conn.t.osmline
- values = sa.values(sa.Column('nr', sa.Integer()), name='housenumber')\
- .data([(n,) for n in numerals])
+
+ values = _int_list_to_subquery(numerals)
sql = sa.select(t.c.place_id, t.c.osm_id,
t.c.parent_place_id, t.c.address,
values.c.nr.label('housenumber'),
numerals: List[int], osm_id: int,
details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
t = conn.t.tiger
- values = sa.values(sa.Column('nr', sa.Integer()), name='housenumber')\
- .data([(n,) for n in numerals])
+ values = _int_list_to_subquery(numerals)
sql = sa.select(t.c.place_id, t.c.parent_place_id,
sa.literal('W').label('osm_type'),
sa.literal(osm_id).label('osm_id'),
tsearch = conn.t.search_name
sql = sql.where(tsearch.c.place_id == t.c.parent_place_id)\
.where((tsearch.c.name_vector + tsearch.c.nameaddress_vector)
- .contains(self.lookups[0].tokens))
+ .contains(sa.type_coerce(self.lookups[0].tokens,
+ IntArray)))
for ranking in self.rankings:
penalty += ranking.sql_penalty(conn.t.search_name)
sql = sql.order_by(sa.text('accuracy'))
if self.housenumbers:
- hnr_regexp = f"\\m({'|'.join(self.housenumbers.values)})\\M"
+ hnr_list = '|'.join(self.housenumbers.values)
sql = sql.where(tsearch.c.address_rank.between(16, 30))\
.where(sa.or_(tsearch.c.address_rank < 30,
- t.c.housenumber.op('~*')(hnr_regexp)))
+ sa.func.RegexpWord(hnr_list, t.c.housenumber)))
# Cross check for housenumbers, need to do that on a rather large
# set. Worst case there are 40.000 main streets in OSM.
# Housenumbers from placex
thnr = conn.t.placex.alias('hnr')
- pid_list = array_agg(thnr.c.place_id) # type: ignore[no-untyped-call]
+ pid_list = sa.func.ArrayAgg(thnr.c.place_id)
place_sql = sa.select(pid_list)\
.where(thnr.c.parent_place_id == inner.c.place_id)\
- .where(thnr.c.housenumber.op('~*')(hnr_regexp))\
+ .where(sa.func.RegexpWord(hnr_list, thnr.c.housenumber))\
.where(thnr.c.linked_place_id == None)\
.where(thnr.c.indexed_status == 0)
return "json_each(%s)" % compiler.process(element.clauses, **kw)
+
class Greatest(sa.sql.functions.GenericFunction[Any]):
""" Function to compute maximum of all its input parameters.
"""
@compiles(Greatest, 'sqlite') # type: ignore[no-untyped-call, misc]
def sqlite_greatest(element: Greatest, compiler: 'sa.Compiled', **kw: Any) -> str:
return "max(%s)" % compiler.process(element.clauses, **kw)
+
+
+
+class RegexpWord(sa.sql.functions.GenericFunction[Any]):
+ """ Check if a full word is in a given string.
+ """
+ name = 'RegexpWord'
+ inherit_cache = True
+
+
+@compiles(RegexpWord, 'postgresql') # type: ignore[no-untyped-call, misc]
+def postgres_regexp_nocase(element: RegexpWord, compiler: 'sa.Compiled', **kw: Any) -> str:
+ arg1, arg2 = list(element.clauses)
+ return "%s ~* ('\\m(' || %s || ')\\M')::text" % (compiler.process(arg2, **kw), compiler.process(arg1, **kw))
+
+
+@compiles(RegexpWord, 'sqlite') # type: ignore[no-untyped-call, misc]
+def sqlite_regexp_nocase(element: RegexpWord, compiler: 'sa.Compiled', **kw: Any) -> str:
+ arg1, arg2 = list(element.clauses)
+ return "regexp('\\b(' || %s || ')\\b', %s)" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
""" Concate the array with the given array. If one of the
operants is null, the value of the other will be returned.
"""
- return sa.func.array_cat(self, other, type_=IntArray)
+ return ArrayCat(self.expr, other)
def contains(self, other: SaColumn, **kwargs: Any) -> 'sa.ColumnOperators':
""" Return true if the array contains all the value of the argument
array.
"""
- return cast('sa.ColumnOperators', self.op('@>', is_comparison=True)(other))
+ return ArrayContains(self.expr, other)
- def overlaps(self, other: SaColumn) -> 'sa.Operators':
- """ Return true if at least one value of the argument is contained
- in the array.
- """
- return self.op('&&', is_comparison=True)(other)
-
class ArrayAgg(sa.sql.functions.GenericFunction[Any]):
""" Aggregate function to collect elements in an array.
name = 'array_agg'
inherit_cache = True
+
@compiles(ArrayAgg, 'sqlite') # type: ignore[no-untyped-call, misc]
def sqlite_array_agg(element: ArrayAgg, compiler: 'sa.Compiled', **kw: Any) -> str:
return "group_concat(%s, ',')" % compiler.process(element.clauses, **kw)
+
+
+
+class ArrayContains(sa.sql.expression.FunctionElement[Any]):
+ """ Function to check if an array is fully contained in another.
+ """
+ name = 'ArrayContains'
+ inherit_cache = True
+
+
+@compiles(ArrayContains) # type: ignore[no-untyped-call, misc]
+def generic_array_contains(element: ArrayContains, compiler: 'sa.Compiled', **kw: Any) -> str:
+ arg1, arg2 = list(element.clauses)
+ return "(%s @> %s)" % (compiler.process(arg1, **kw),
+ compiler.process(arg2, **kw))
+
+
+@compiles(ArrayContains, 'sqlite') # type: ignore[no-untyped-call, misc]
+def sqlite_array_contains(element: ArrayContains, compiler: 'sa.Compiled', **kw: Any) -> str:
+ return "array_contains(%s)" % compiler.process(element.clauses, **kw)
+
+
+
+class ArrayCat(sa.sql.expression.FunctionElement[Any]):
+ """ Function to check if an array is fully contained in another.
+ """
+ type = IntArray()
+ identifier = 'ArrayCat'
+ inherit_cache = True
+
+
+@compiles(ArrayCat) # type: ignore[no-untyped-call, misc]
+def generic_array_cat(element: ArrayCat, compiler: 'sa.Compiled', **kw: Any) -> str:
+ return "array_cat(%s)" % compiler.process(element.clauses, **kw)
+
+
+@compiles(ArrayCat, 'sqlite') # type: ignore[no-untyped-call, misc]
+def sqlite_array_cat(element: ArrayCat, compiler: 'sa.Compiled', **kw: Any) -> str:
+ arg1, arg2 = list(element.clauses)
+ return "(%s || ',' || %s)" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
+
from typing import Any
import sqlalchemy as sa
+from sqlalchemy.ext.compiler import compiles
from sqlalchemy.dialects.postgresql import HSTORE
from sqlalchemy.dialects.sqlite import JSON as sqlite_json
one, overwriting values where necessary. When the argument
is null, nothing happens.
"""
- return self.op('||')(sa.func.coalesce(other,
- sa.type_coerce('', KeyValueStore)))
+ return KeyValueConcat(self.expr, other)
+
+
+class KeyValueConcat(sa.sql.expression.FunctionElement[Any]):
+ """ Return the merged key-value store from the input parameters.
+ """
+ type = KeyValueStore()
+ name = 'JsonConcat'
+ inherit_cache = True
+
+@compiles(KeyValueConcat) # type: ignore[no-untyped-call, misc]
+def default_json_concat(element: KeyValueConcat, compiler: 'sa.Compiled', **kw: Any) -> str:
+ arg1, arg2 = list(element.clauses)
+ return "(%s || coalesce(%s, ''::hstore))" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
+
+@compiles(KeyValueConcat, 'sqlite') # type: ignore[no-untyped-call, misc]
+def sqlite_json_concat(element: KeyValueConcat, compiler: 'sa.Compiled', **kw: Any) -> str:
+ arg1, arg2 = list(element.clauses)
+ return "json_patch(%s, coalesce(%s, '{}'))" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
+
- def has_key(self, key: SaColumn) -> 'sa.Operators':
- """ Return true if the key is cotained in the store.
- """
- return self.op('?', is_comparison=True)(key)
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Custom functions for SQLite.
+"""
+from typing import cast, Optional, Set, Any
+import json
+
+# pylint: disable=protected-access
+
+def weigh_search(search_vector: Optional[str], rankings: str, default: float) -> float:
+ """ Custom weight function for search results.
+ """
+ if search_vector is not None:
+ svec = [int(x) for x in search_vector.split(',')]
+ for rank in json.loads(rankings):
+ if all(r in svec for r in rank[1]):
+ return cast(float, rank[0])
+
+ return default
+
+
+class ArrayIntersectFuzzy:
+ """ Compute the array of common elements of all input integer arrays.
+ Very large input paramenters may be ignored to speed up
+ computation. Therefore, the result is a superset of common elements.
+
+ Input and output arrays are given as comma-separated lists.
+ """
+ def __init__(self) -> None:
+ self.first = ''
+ self.values: Optional[Set[int]] = None
+
+ def step(self, value: Optional[str]) -> None:
+ """ Add the next array to the intersection.
+ """
+ if value is not None:
+ if not self.first:
+ self.first = value
+ elif len(value) < 10000000:
+ if self.values is None:
+ self.values = {int(x) for x in self.first.split(',')}
+ self.values.intersection_update((int(x) for x in value.split(',')))
+
+ def finalize(self) -> str:
+ """ Return the final result.
+ """
+ if self.values is not None:
+ return ','.join(map(str, self.values))
+
+ return self.first
+
+
+class ArrayUnion:
+ """ Compute the set of all elements of the input integer arrays.
+
+ Input and output arrays are given as strings of comma-separated lists.
+ """
+ def __init__(self) -> None:
+ self.values: Optional[Set[str]] = None
+
+ def step(self, value: Optional[str]) -> None:
+ """ Add the next array to the union.
+ """
+ if value is not None:
+ if self.values is None:
+ self.values = set(value.split(','))
+ else:
+ self.values.update(value.split(','))
+
+ def finalize(self) -> str:
+ """ Return the final result.
+ """
+ return '' if self.values is None else ','.join(self.values)
+
+
+def array_contains(container: Optional[str], containee: Optional[str]) -> Optional[bool]:
+ """ Is the array 'containee' completely contained in array 'container'.
+ """
+ if container is None or containee is None:
+ return None
+
+ vset = container.split(',')
+ return all(v in vset for v in containee.split(','))
+
+
+def array_pair_contains(container1: Optional[str], container2: Optional[str],
+ containee: Optional[str]) -> Optional[bool]:
+ """ Is the array 'containee' completely contained in the union of
+ array 'container1' and array 'container2'.
+ """
+ if container1 is None or container2 is None or containee is None:
+ return None
+
+ vset = container1.split(',') + container2.split(',')
+ return all(v in vset for v in containee.split(','))
+
+
+def install_custom_functions(conn: Any) -> None:
+ """ Install helper functions for Nominatim into the given SQLite
+ database connection.
+ """
+ conn.create_function('weigh_search', 3, weigh_search, deterministic=True)
+ conn.create_function('array_contains', 2, array_contains, deterministic=True)
+ conn.create_function('array_pair_contains', 3, array_pair_contains, deterministic=True)
+ _create_aggregate(conn, 'array_intersect_fuzzy', 1, ArrayIntersectFuzzy)
+ _create_aggregate(conn, 'array_union', 1, ArrayUnion)
+
+
+async def _make_aggregate(aioconn: Any, *args: Any) -> None:
+ await aioconn._execute(aioconn._conn.create_aggregate, *args)
+
+
+def _create_aggregate(conn: Any, name: str, nargs: int, aggregate: Any) -> None:
+ try:
+ conn.await_(_make_aggregate(conn._connection, name, nargs, aggregate))
+ except Exception as error: # pylint: disable=broad-exception-caught
+ conn._handle_exception(error)
async def create_search_index(self) -> None:
""" Create the tables and indexes needed for word lookup.
"""
+ LOG.warning("Creating reverse search table")
+ rsn = sa.Table('reverse_search_name', self.dest.t.meta,
+ sa.Column('word', sa.Integer()),
+ sa.Column('column', sa.Text()),
+ sa.Column('places', IntArray))
+ await self.dest.connection.run_sync(rsn.create)
+
tsrc = self.src.t.search_name
for column in ('name_vector', 'nameaddress_vector'):
- table_name = f'reverse_search_{column}'
- LOG.warning("Creating reverse search %s", table_name)
- rsn = sa.Table(table_name, self.dest.t.meta,
- sa.Column('word', sa.Integer()),
- sa.Column('places', IntArray))
- await self.dest.connection.run_sync(rsn.create)
-
sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
.group_by('word')
for row in partition:
row.places.sort()
data.append({'word': row.word,
+ 'column': column,
'places': row.places})
await self.dest.execute(rsn.insert(), data)
- await self.dest.connection.run_sync(
- sa.Index(f'idx_reverse_search_{column}_word', rsn.c.word).create)
+ await self.dest.connection.run_sync(
+ sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
def select_from(self, table: str) -> SaSelect:
import nominatim.api as napi
from nominatim.db.sql_preprocessor import SQLPreprocessor
+from nominatim.api.search.query_analyzer_factory import make_query_analyzer
from nominatim.tools import convert_sqlite
import nominatim.api.logging as loglib
""")))
+ def add_word_table(self, content):
+ data = [dict(zip(['word_id', 'word_token', 'type', 'word', 'info'], c))
+ for c in content]
+
+ async def _do_sql():
+ async with self.api._async_api.begin() as conn:
+ if 'word' not in conn.t.meta.tables:
+ await make_query_analyzer(conn)
+ word_table = conn.t.meta.tables['word']
+ await conn.connection.run_sync(word_table.create)
+ if data:
+ await conn.execute(conn.t.meta.tables['word'].insert(), data)
+
+ self.async_to_sync(_do_sql())
+
+
async def exec_async(self, sql, *args, **kwargs):
async with self.api._async_api.begin() as conn:
return await conn.execute(sql, *args, **kwargs)
db = str(tmp_path / 'test_nominatim_python_unittest.sqlite')
def mkapi(apiobj, options={'reverse'}):
+ apiobj.add_data('properties',
+ [{'property': 'tokenizer', 'value': 'icu'},
+ {'property': 'tokenizer_import_normalisation', 'value': ':: lower();'},
+ {'property': 'tokenizer_import_transliteration', 'value': "'1' > '/1/'; 'ä' > 'ä '"},
+ ])
+
+ async def _do_sql():
+ async with apiobj.api._async_api.begin() as conn:
+ if 'word' in conn.t.meta.tables:
+ return
+ await make_query_analyzer(conn)
+ word_table = conn.t.meta.tables['word']
+ await conn.connection.run_sync(word_table.create)
+
+ apiobj.async_to_sync(_do_sql())
+
event_loop.run_until_complete(convert_sqlite.convert(Path('/invalid'),
db, options))
outapi = napi.NominatimAPI(Path('/invalid'),
from nominatim.api.search.db_search_fields import WeightedStrings
-def run_search(apiobj, global_penalty, ccodes,
+def run_search(apiobj, frontend, global_penalty, ccodes,
country_penalties=None, details=SearchDetails()):
if country_penalties is None:
country_penalties = [0.0] * len(ccodes)
countries = WeightedStrings(ccodes, country_penalties)
search = CountrySearch(MySearchData())
+ api = frontend(apiobj, options=['search'])
async def run():
- async with apiobj.api._async_api.begin() as conn:
+ async with api._async_api.begin() as conn:
return await search.lookup(conn, details)
- return apiobj.async_to_sync(run())
+ return api._loop.run_until_complete(run())
-def test_find_from_placex(apiobj):
+def test_find_from_placex(apiobj, frontend):
apiobj.add_placex(place_id=55, class_='boundary', type='administrative',
rank_search=4, rank_address=4,
name={'name': 'Lolaland'},
centroid=(10, 10),
geometry='POLYGON((9.5 9.5, 9.5 10.5, 10.5 10.5, 10.5 9.5, 9.5 9.5))')
- results = run_search(apiobj, 0.5, ['de', 'yw'], [0.0, 0.3])
+ results = run_search(apiobj, frontend, 0.5, ['de', 'yw'], [0.0, 0.3])
assert len(results) == 1
assert results[0].place_id == 55
assert results[0].accuracy == 0.8
-def test_find_from_fallback_countries(apiobj):
+def test_find_from_fallback_countries(apiobj, frontend):
apiobj.add_country('ro', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))')
apiobj.add_country_name('ro', {'name': 'România'})
- results = run_search(apiobj, 0.0, ['ro'])
+ results = run_search(apiobj, frontend, 0.0, ['ro'])
assert len(results) == 1
assert results[0].names == {'name': 'România'}
-def test_find_none(apiobj):
- assert len(run_search(apiobj, 0.0, ['xx'])) == 0
+def test_find_none(apiobj, frontend):
+ assert len(run_search(apiobj, frontend, 0.0, ['xx'])) == 0
@pytest.mark.parametrize('coord,numres', [((0.5, 1), 1), ((10, 10), 0)])
-def test_find_near(apiobj, coord, numres):
+def test_find_near(apiobj, frontend, coord, numres):
apiobj.add_country('ro', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))')
apiobj.add_country_name('ro', {'name': 'România'})
- results = run_search(apiobj, 0.0, ['ro'],
+ results = run_search(apiobj, frontend, 0.0, ['ro'],
details=SearchDetails(near=napi.Point(*coord),
near_radius=0.1))
napi.GeometryFormat.SVG,
napi.GeometryFormat.TEXT])
@pytest.mark.parametrize('cc', ['yw', 'ro'])
- def test_return_geometries(self, apiobj, geom, cc):
- results = run_search(apiobj, 0.5, [cc],
+ def test_return_geometries(self, apiobj, frontend, geom, cc):
+ results = run_search(apiobj, frontend, 0.5, [cc],
details=SearchDetails(geometry_output=geom))
assert len(results) == 1
@pytest.mark.parametrize('pid,rids', [(76, [55]), (55, [])])
- def test_exclude_place_id(self, apiobj, pid, rids):
- results = run_search(apiobj, 0.5, ['yw', 'ro'],
+ def test_exclude_place_id(self, apiobj, frontend, pid, rids):
+ results = run_search(apiobj, frontend, 0.5, ['yw', 'ro'],
details=SearchDetails(excluded=[pid]))
assert [r.place_id for r in results] == rids
@pytest.mark.parametrize('viewbox,rids', [((9, 9, 11, 11), [55]),
((-10, -10, -3, -3), [])])
- def test_bounded_viewbox_in_placex(self, apiobj, viewbox, rids):
- results = run_search(apiobj, 0.5, ['yw'],
+ def test_bounded_viewbox_in_placex(self, apiobj, frontend, viewbox, rids):
+ results = run_search(apiobj, frontend, 0.5, ['yw'],
details=SearchDetails.from_kwargs({'viewbox': viewbox,
'bounded_viewbox': True}))
@pytest.mark.parametrize('viewbox,numres', [((0, 0, 1, 1), 1),
((-10, -10, -3, -3), 0)])
- def test_bounded_viewbox_in_fallback(self, apiobj, viewbox, numres):
- results = run_search(apiobj, 0.5, ['ro'],
+ def test_bounded_viewbox_in_fallback(self, apiobj, frontend, viewbox, numres):
+ results = run_search(apiobj, frontend, 0.5, ['ro'],
details=SearchDetails.from_kwargs({'viewbox': viewbox,
'bounded_viewbox': True}))
from nominatim.api.search.db_search_lookups import LookupAll
-def run_search(apiobj, global_penalty, cat, cat_penalty=None, ccodes=[],
+def run_search(apiobj, frontend, global_penalty, cat, cat_penalty=None, ccodes=[],
details=SearchDetails()):
class PlaceSearchData:
near_search = NearSearch(0.1, WeightedCategories(cat, cat_penalty), place_search)
+ api = frontend(apiobj, options=['search'])
+
async def run():
- async with apiobj.api._async_api.begin() as conn:
+ async with api._async_api.begin() as conn:
return await near_search.lookup(conn, details)
- results = apiobj.async_to_sync(run())
+ results = api._loop.run_until_complete(run())
results.sort(key=lambda r: r.accuracy)
return results
-def test_no_results_inner_query(apiobj):
- assert not run_search(apiobj, 0.4, [('this', 'that')])
+def test_no_results_inner_query(apiobj, frontend):
+ assert not run_search(apiobj, frontend, 0.4, [('this', 'that')])
-def test_no_appropriate_results_inner_query(apiobj):
+def test_no_appropriate_results_inner_query(apiobj, frontend):
apiobj.add_placex(place_id=100, country_code='us',
centroid=(5.6, 4.3),
geometry='POLYGON((0.0 0.0, 10.0 0.0, 10.0 2.0, 0.0 2.0, 0.0 0.0))')
apiobj.add_placex(place_id=22, class_='amenity', type='bank',
centroid=(5.6001, 4.2994))
- assert not run_search(apiobj, 0.4, [('amenity', 'bank')])
+ assert not run_search(apiobj, frontend, 0.4, [('amenity', 'bank')])
class TestNearSearch:
centroid=(-10.3, 56.9))
- def test_near_in_placex(self, apiobj):
+ def test_near_in_placex(self, apiobj, frontend):
apiobj.add_placex(place_id=22, class_='amenity', type='bank',
centroid=(5.6001, 4.2994))
apiobj.add_placex(place_id=23, class_='amenity', type='bench',
centroid=(5.6001, 4.2994))
- results = run_search(apiobj, 0.1, [('amenity', 'bank')])
+ results = run_search(apiobj, frontend, 0.1, [('amenity', 'bank')])
assert [r.place_id for r in results] == [22]
- def test_multiple_types_near_in_placex(self, apiobj):
+ def test_multiple_types_near_in_placex(self, apiobj, frontend):
apiobj.add_placex(place_id=22, class_='amenity', type='bank',
importance=0.002,
centroid=(5.6001, 4.2994))
importance=0.001,
centroid=(5.6001, 4.2994))
- results = run_search(apiobj, 0.1, [('amenity', 'bank'),
- ('amenity', 'bench')])
+ results = run_search(apiobj, frontend, 0.1, [('amenity', 'bank'),
+ ('amenity', 'bench')])
assert [r.place_id for r in results] == [22, 23]
- def test_near_in_classtype(self, apiobj):
+ def test_near_in_classtype(self, apiobj, frontend):
apiobj.add_placex(place_id=22, class_='amenity', type='bank',
centroid=(5.6, 4.34))
apiobj.add_placex(place_id=23, class_='amenity', type='bench',
apiobj.add_class_type_table('amenity', 'bank')
apiobj.add_class_type_table('amenity', 'bench')
- results = run_search(apiobj, 0.1, [('amenity', 'bank')])
+ results = run_search(apiobj, frontend, 0.1, [('amenity', 'bank')])
assert [r.place_id for r in results] == [22]
@pytest.mark.parametrize('cc,rid', [('us', 22), ('mx', 23)])
- def test_restrict_by_country(self, apiobj, cc, rid):
+ def test_restrict_by_country(self, apiobj, frontend, cc, rid):
apiobj.add_placex(place_id=22, class_='amenity', type='bank',
centroid=(5.6001, 4.2994),
country_code='us')
centroid=(-10.3001, 56.9),
country_code='us')
- results = run_search(apiobj, 0.1, [('amenity', 'bank')], ccodes=[cc, 'fr'])
+ results = run_search(apiobj, frontend, 0.1, [('amenity', 'bank')], ccodes=[cc, 'fr'])
assert [r.place_id for r in results] == [rid]
@pytest.mark.parametrize('excluded,rid', [(22, 122), (122, 22)])
- def test_exclude_place_by_id(self, apiobj, excluded, rid):
+ def test_exclude_place_by_id(self, apiobj, frontend, excluded, rid):
apiobj.add_placex(place_id=22, class_='amenity', type='bank',
centroid=(5.6001, 4.2994),
country_code='us')
country_code='us')
- results = run_search(apiobj, 0.1, [('amenity', 'bank')],
+ results = run_search(apiobj, frontend, 0.1, [('amenity', 'bank')],
details=SearchDetails(excluded=[excluded]))
assert [r.place_id for r in results] == [rid]
@pytest.mark.parametrize('layer,rids', [(napi.DataLayer.POI, [22]),
(napi.DataLayer.MANMADE, [])])
- def test_with_layer(self, apiobj, layer, rids):
+ def test_with_layer(self, apiobj, frontend, layer, rids):
apiobj.add_placex(place_id=22, class_='amenity', type='bank',
centroid=(5.6001, 4.2994),
country_code='us')
- results = run_search(apiobj, 0.1, [('amenity', 'bank')],
+ results = run_search(apiobj, frontend, 0.1, [('amenity', 'bank')],
details=SearchDetails(layers=layer))
assert [r.place_id for r in results] == rids
FieldLookup, FieldRanking, RankedTokens
from nominatim.api.search.db_search_lookups import LookupAll, LookupAny, Restrict
-def run_search(apiobj, global_penalty, lookup, ranking, count=2,
+APIOPTIONS = ['search']
+
+def run_search(apiobj, frontend, global_penalty, lookup, ranking, count=2,
hnrs=[], pcs=[], ccodes=[], quals=[],
details=SearchDetails()):
class MySearchData:
search = PlaceSearch(0.0, MySearchData(), count)
+ if frontend is None:
+ api = apiobj
+ else:
+ api = frontend(apiobj, options=APIOPTIONS)
+
async def run():
- async with apiobj.api._async_api.begin() as conn:
+ async with api._async_api.begin() as conn:
return await search.lookup(conn, details)
- results = apiobj.async_to_sync(run())
+ results = api._loop.run_until_complete(run())
results.sort(key=lambda r: r.accuracy)
return results
@pytest.mark.parametrize('lookup_type', [LookupAll, Restrict])
@pytest.mark.parametrize('rank,res', [([10], [100, 101]),
([20], [101, 100])])
- def test_lookup_all_match(self, apiobj, lookup_type, rank, res):
+ def test_lookup_all_match(self, apiobj, frontend, lookup_type, rank, res):
lookup = FieldLookup('name_vector', [1,2], lookup_type)
ranking = FieldRanking('name_vector', 0.9, [RankedTokens(0.0, rank)])
- results = run_search(apiobj, 0.1, [lookup], [ranking])
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking])
assert [r.place_id for r in results] == res
@pytest.mark.parametrize('lookup_type', [LookupAll, Restrict])
- def test_lookup_all_partial_match(self, apiobj, lookup_type):
+ def test_lookup_all_partial_match(self, apiobj, frontend, lookup_type):
lookup = FieldLookup('name_vector', [1,20], lookup_type)
ranking = FieldRanking('name_vector', 0.9, [RankedTokens(0.0, [21])])
- results = run_search(apiobj, 0.1, [lookup], [ranking])
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking])
assert len(results) == 1
assert results[0].place_id == 101
@pytest.mark.parametrize('rank,res', [([10], [100, 101]),
([20], [101, 100])])
- def test_lookup_any_match(self, apiobj, rank, res):
+ def test_lookup_any_match(self, apiobj, frontend, rank, res):
lookup = FieldLookup('name_vector', [11,21], LookupAny)
ranking = FieldRanking('name_vector', 0.9, [RankedTokens(0.0, rank)])
- results = run_search(apiobj, 0.1, [lookup], [ranking])
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking])
assert [r.place_id for r in results] == res
- def test_lookup_any_partial_match(self, apiobj):
+ def test_lookup_any_partial_match(self, apiobj, frontend):
lookup = FieldLookup('name_vector', [20], LookupAll)
ranking = FieldRanking('name_vector', 0.9, [RankedTokens(0.0, [21])])
- results = run_search(apiobj, 0.1, [lookup], [ranking])
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking])
assert len(results) == 1
assert results[0].place_id == 101
@pytest.mark.parametrize('cc,res', [('us', 100), ('mx', 101)])
- def test_lookup_restrict_country(self, apiobj, cc, res):
+ def test_lookup_restrict_country(self, apiobj, frontend, cc, res):
lookup = FieldLookup('name_vector', [1,2], LookupAll)
ranking = FieldRanking('name_vector', 0.9, [RankedTokens(0.0, [10])])
- results = run_search(apiobj, 0.1, [lookup], [ranking], ccodes=[cc])
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], ccodes=[cc])
assert [r.place_id for r in results] == [res]
- def test_lookup_restrict_placeid(self, apiobj):
+ def test_lookup_restrict_placeid(self, apiobj, frontend):
lookup = FieldLookup('name_vector', [1,2], LookupAll)
ranking = FieldRanking('name_vector', 0.9, [RankedTokens(0.0, [10])])
- results = run_search(apiobj, 0.1, [lookup], [ranking],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking],
details=SearchDetails(excluded=[101]))
assert [r.place_id for r in results] == [100]
napi.GeometryFormat.KML,
napi.GeometryFormat.SVG,
napi.GeometryFormat.TEXT])
- def test_return_geometries(self, apiobj, geom):
+ def test_return_geometries(self, apiobj, frontend, geom):
lookup = FieldLookup('name_vector', [20], LookupAll)
ranking = FieldRanking('name_vector', 0.9, [RankedTokens(0.0, [21])])
- results = run_search(apiobj, 0.1, [lookup], [ranking],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking],
details=SearchDetails(geometry_output=geom))
assert geom.name.lower() in results[0].geometry
@pytest.mark.parametrize('factor,npoints', [(0.0, 3), (1.0, 2)])
- def test_return_simplified_geometry(self, apiobj, factor, npoints):
+ def test_return_simplified_geometry(self, apiobj, frontend, factor, npoints):
apiobj.add_placex(place_id=333, country_code='us',
centroid=(9.0, 9.0),
geometry='LINESTRING(8.9 9.0, 9.0 9.0, 9.1 9.0)')
lookup = FieldLookup('name_vector', [55], LookupAll)
ranking = FieldRanking('name_vector', 0.9, [RankedTokens(0.0, [21])])
- results = run_search(apiobj, 0.1, [lookup], [ranking],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking],
details=SearchDetails(geometry_output=napi.GeometryFormat.GEOJSON,
geometry_simplification=factor))
@pytest.mark.parametrize('viewbox', ['5.0,4.0,6.0,5.0', '5.7,4.0,6.0,5.0'])
@pytest.mark.parametrize('wcount,rids', [(2, [100, 101]), (20000, [100])])
- def test_prefer_viewbox(self, apiobj, viewbox, wcount, rids):
+ def test_prefer_viewbox(self, apiobj, frontend, viewbox, wcount, rids):
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
ranking = FieldRanking('name_vector', 0.2, [RankedTokens(0.0, [21])])
- results = run_search(apiobj, 0.1, [lookup], [ranking])
+ api = frontend(apiobj, options=APIOPTIONS)
+ results = run_search(api, None, 0.1, [lookup], [ranking])
assert [r.place_id for r in results] == [101, 100]
- results = run_search(apiobj, 0.1, [lookup], [ranking], count=wcount,
+ results = run_search(api, None, 0.1, [lookup], [ranking], count=wcount,
details=SearchDetails.from_kwargs({'viewbox': viewbox}))
assert [r.place_id for r in results] == rids
@pytest.mark.parametrize('viewbox', ['5.0,4.0,6.0,5.0', '5.55,4.27,5.62,4.31'])
- def test_force_viewbox(self, apiobj, viewbox):
+ def test_force_viewbox(self, apiobj, frontend, viewbox):
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
details=SearchDetails.from_kwargs({'viewbox': viewbox,
'bounded_viewbox': True})
- results = run_search(apiobj, 0.1, [lookup], [], details=details)
+ results = run_search(apiobj, frontend, 0.1, [lookup], [], details=details)
assert [r.place_id for r in results] == [100]
- def test_prefer_near(self, apiobj):
+ def test_prefer_near(self, apiobj, frontend):
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
ranking = FieldRanking('name_vector', 0.9, [RankedTokens(0.0, [21])])
- results = run_search(apiobj, 0.1, [lookup], [ranking])
+ api = frontend(apiobj, options=APIOPTIONS)
+ results = run_search(api, None, 0.1, [lookup], [ranking])
assert [r.place_id for r in results] == [101, 100]
- results = run_search(apiobj, 0.1, [lookup], [ranking],
+ results = run_search(api, None, 0.1, [lookup], [ranking],
details=SearchDetails.from_kwargs({'near': '5.6,4.3'}))
results.sort(key=lambda r: -r.importance)
assert [r.place_id for r in results] == [100, 101]
@pytest.mark.parametrize('radius', [0.09, 0.11])
- def test_force_near(self, apiobj, radius):
+ def test_force_near(self, apiobj, frontend, radius):
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
details=SearchDetails.from_kwargs({'near': '5.6,4.3',
'near_radius': radius})
- results = run_search(apiobj, 0.1, [lookup], [], details=details)
+ results = run_search(apiobj, frontend, 0.1, [lookup], [], details=details)
assert [r.place_id for r in results] == [100]
@pytest.mark.parametrize('hnr,res', [('20', [91, 1]), ('20 a', [1]),
('21', [2]), ('22', [2, 92]),
('24', [93]), ('25', [])])
- def test_lookup_by_single_housenumber(self, apiobj, hnr, res):
+ def test_lookup_by_single_housenumber(self, apiobj, frontend, hnr, res):
lookup = FieldLookup('name_vector', [1,2], LookupAll)
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
- results = run_search(apiobj, 0.1, [lookup], [ranking], hnrs=[hnr])
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=[hnr])
assert [r.place_id for r in results] == res + [1000, 2000]
@pytest.mark.parametrize('cc,res', [('es', [2, 1000]), ('pt', [92, 2000])])
- def test_lookup_with_country_restriction(self, apiobj, cc, res):
+ def test_lookup_with_country_restriction(self, apiobj, frontend, cc, res):
lookup = FieldLookup('name_vector', [1,2], LookupAll)
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
- results = run_search(apiobj, 0.1, [lookup], [ranking], hnrs=['22'],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
ccodes=[cc])
assert [r.place_id for r in results] == res
- def test_lookup_exclude_housenumber_placeid(self, apiobj):
+ def test_lookup_exclude_housenumber_placeid(self, apiobj, frontend):
lookup = FieldLookup('name_vector', [1,2], LookupAll)
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
- results = run_search(apiobj, 0.1, [lookup], [ranking], hnrs=['22'],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
details=SearchDetails(excluded=[92]))
assert [r.place_id for r in results] == [2, 1000, 2000]
- def test_lookup_exclude_street_placeid(self, apiobj):
+ def test_lookup_exclude_street_placeid(self, apiobj, frontend):
lookup = FieldLookup('name_vector', [1,2], LookupAll)
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
- results = run_search(apiobj, 0.1, [lookup], [ranking], hnrs=['22'],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
details=SearchDetails(excluded=[1000]))
assert [r.place_id for r in results] == [2, 92, 2000]
- def test_lookup_only_house_qualifier(self, apiobj):
+ def test_lookup_only_house_qualifier(self, apiobj, frontend):
lookup = FieldLookup('name_vector', [1,2], LookupAll)
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
- results = run_search(apiobj, 0.1, [lookup], [ranking], hnrs=['22'],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
quals=[('place', 'house')])
assert [r.place_id for r in results] == [2, 92]
- def test_lookup_only_street_qualifier(self, apiobj):
+ def test_lookup_only_street_qualifier(self, apiobj, frontend):
lookup = FieldLookup('name_vector', [1,2], LookupAll)
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
- results = run_search(apiobj, 0.1, [lookup], [ranking], hnrs=['22'],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
quals=[('highway', 'residential')])
assert [r.place_id for r in results] == [1000, 2000]
@pytest.mark.parametrize('rank,found', [(26, True), (27, False), (30, False)])
- def test_lookup_min_rank(self, apiobj, rank, found):
+ def test_lookup_min_rank(self, apiobj, frontend, rank, found):
lookup = FieldLookup('name_vector', [1,2], LookupAll)
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
- results = run_search(apiobj, 0.1, [lookup], [ranking], hnrs=['22'],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [ranking], hnrs=['22'],
details=SearchDetails(min_rank=rank))
assert [r.place_id for r in results] == ([2, 92, 1000, 2000] if found else [2, 92])
napi.GeometryFormat.KML,
napi.GeometryFormat.SVG,
napi.GeometryFormat.TEXT])
- def test_return_geometries(self, apiobj, geom):
+ def test_return_geometries(self, apiobj, frontend, geom):
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
- results = run_search(apiobj, 0.1, [lookup], [], hnrs=['20', '21', '22'],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['20', '21', '22'],
details=SearchDetails(geometry_output=geom))
assert results
assert all(geom.name.lower() in r.geometry for r in results)
-def test_very_large_housenumber(apiobj):
+def test_very_large_housenumber(apiobj, frontend):
apiobj.add_placex(place_id=93, class_='place', type='house',
parent_place_id=2000,
housenumber='2467463524544', country_code='pt')
lookup = FieldLookup('name_vector', [1, 2], LookupAll)
- results = run_search(apiobj, 0.1, [lookup], [], hnrs=['2467463524544'],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['2467463524544'],
details=SearchDetails())
assert results
@pytest.mark.parametrize('wcount,rids', [(2, [990, 991]), (30000, [990])])
-def test_name_and_postcode(apiobj, wcount, rids):
+def test_name_and_postcode(apiobj, frontend, wcount, rids):
apiobj.add_placex(place_id=990, class_='highway', type='service',
rank_search=27, rank_address=27,
postcode='11225',
lookup = FieldLookup('name_vector', [111], LookupAll)
- results = run_search(apiobj, 0.1, [lookup], [], pcs=['11225'], count=wcount,
+ results = run_search(apiobj, frontend, 0.1, [lookup], [], pcs=['11225'], count=wcount,
details=SearchDetails())
assert results
@pytest.mark.parametrize('hnr,res', [('21', [992]), ('22', []), ('23', [991])])
- def test_lookup_housenumber(self, apiobj, hnr, res):
+ def test_lookup_housenumber(self, apiobj, frontend, hnr, res):
lookup = FieldLookup('name_vector', [111], LookupAll)
- results = run_search(apiobj, 0.1, [lookup], [], hnrs=[hnr])
+ results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=[hnr])
assert [r.place_id for r in results] == res + [990]
napi.GeometryFormat.KML,
napi.GeometryFormat.SVG,
napi.GeometryFormat.TEXT])
- def test_osmline_with_geometries(self, apiobj, geom):
+ def test_osmline_with_geometries(self, apiobj, frontend, geom):
lookup = FieldLookup('name_vector', [111], LookupAll)
- results = run_search(apiobj, 0.1, [lookup], [], hnrs=['21'],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['21'],
details=SearchDetails(geometry_output=geom))
assert results[0].place_id == 992
@pytest.mark.parametrize('hnr,res', [('21', [992]), ('22', []), ('23', [991])])
- def test_lookup_housenumber(self, apiobj, hnr, res):
+ def test_lookup_housenumber(self, apiobj, frontend, hnr, res):
lookup = FieldLookup('name_vector', [111], LookupAll)
- results = run_search(apiobj, 0.1, [lookup], [], hnrs=[hnr])
+ results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=[hnr])
assert [r.place_id for r in results] == res + [990]
napi.GeometryFormat.KML,
napi.GeometryFormat.SVG,
napi.GeometryFormat.TEXT])
- def test_tiger_with_geometries(self, apiobj, geom):
+ def test_tiger_with_geometries(self, apiobj, frontend, geom):
lookup = FieldLookup('name_vector', [111], LookupAll)
- results = run_search(apiobj, 0.1, [lookup], [], hnrs=['21'],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [], hnrs=['21'],
details=SearchDetails(geometry_output=geom))
assert results[0].place_id == 992
(napi.DataLayer.NATURAL, [227]),
(napi.DataLayer.MANMADE | napi.DataLayer.NATURAL, [225, 227]),
(napi.DataLayer.MANMADE | napi.DataLayer.RAILWAY, [225, 226])])
- def test_layers_rank30(self, apiobj, layer, res):
+ def test_layers_rank30(self, apiobj, frontend, layer, res):
lookup = FieldLookup('name_vector', [34], LookupAny)
- results = run_search(apiobj, 0.1, [lookup], [],
+ results = run_search(apiobj, frontend, 0.1, [lookup], [],
details=SearchDetails(layers=layer))
assert [r.place_id for r in results] == res
from nominatim.api.search.db_search_fields import WeightedStrings, WeightedCategories
-def run_search(apiobj, global_penalty, poitypes, poi_penalties=None,
+def run_search(apiobj, frontend, global_penalty, poitypes, poi_penalties=None,
ccodes=[], details=SearchDetails()):
if poi_penalties is None:
poi_penalties = [0.0] * len(poitypes)
search = PoiSearch(MySearchData())
+ api = frontend(apiobj, options=['search'])
+
async def run():
- async with apiobj.api._async_api.begin() as conn:
+ async with api._async_api.begin() as conn:
return await search.lookup(conn, details)
- return apiobj.async_to_sync(run())
+ return api._loop.run_until_complete(run())
@pytest.mark.parametrize('coord,pid', [('34.3, 56.100021', 2),
('5.0, 4.59933', 1)])
-def test_simple_near_search_in_placex(apiobj, coord, pid):
+def test_simple_near_search_in_placex(apiobj, frontend, coord, pid):
apiobj.add_placex(place_id=1, class_='highway', type='bus_stop',
centroid=(5.0, 4.6))
apiobj.add_placex(place_id=2, class_='highway', type='bus_stop',
details = SearchDetails.from_kwargs({'near': coord, 'near_radius': 0.001})
- results = run_search(apiobj, 0.1, [('highway', 'bus_stop')], [0.5], details=details)
+ results = run_search(apiobj, frontend, 0.1, [('highway', 'bus_stop')], [0.5], details=details)
assert [r.place_id for r in results] == [pid]
@pytest.mark.parametrize('coord,pid', [('34.3, 56.100021', 2),
('34.3, 56.4', 2),
('5.0, 4.59933', 1)])
-def test_simple_near_search_in_classtype(apiobj, coord, pid):
+def test_simple_near_search_in_classtype(apiobj, frontend, coord, pid):
apiobj.add_placex(place_id=1, class_='highway', type='bus_stop',
centroid=(5.0, 4.6))
apiobj.add_placex(place_id=2, class_='highway', type='bus_stop',
details = SearchDetails.from_kwargs({'near': coord, 'near_radius': 0.5})
- results = run_search(apiobj, 0.1, [('highway', 'bus_stop')], [0.5], details=details)
+ results = run_search(apiobj, frontend, 0.1, [('highway', 'bus_stop')], [0.5], details=details)
assert [r.place_id for r in results] == [pid]
self.args = {'near': '34.3, 56.100021', 'near_radius': 0.001}
- def test_unrestricted(self, apiobj):
- results = run_search(apiobj, 0.1, [('highway', 'bus_stop')], [0.5],
+ def test_unrestricted(self, apiobj, frontend):
+ results = run_search(apiobj, frontend, 0.1, [('highway', 'bus_stop')], [0.5],
details=SearchDetails.from_kwargs(self.args))
assert [r.place_id for r in results] == [1, 2]
- def test_restict_country(self, apiobj):
- results = run_search(apiobj, 0.1, [('highway', 'bus_stop')], [0.5],
+ def test_restict_country(self, apiobj, frontend):
+ results = run_search(apiobj, frontend, 0.1, [('highway', 'bus_stop')], [0.5],
ccodes=['de', 'nz'],
details=SearchDetails.from_kwargs(self.args))
assert [r.place_id for r in results] == [2]
- def test_restrict_by_viewbox(self, apiobj):
+ def test_restrict_by_viewbox(self, apiobj, frontend):
args = {'bounded_viewbox': True, 'viewbox': '34.299,56.0,34.3001,56.10001'}
args.update(self.args)
- results = run_search(apiobj, 0.1, [('highway', 'bus_stop')], [0.5],
+ results = run_search(apiobj, frontend, 0.1, [('highway', 'bus_stop')], [0.5],
ccodes=['de', 'nz'],
details=SearchDetails.from_kwargs(args))
from nominatim.api.search.db_search_fields import WeightedStrings, FieldLookup, \
FieldRanking, RankedTokens
-def run_search(apiobj, global_penalty, pcs, pc_penalties=None,
+def run_search(apiobj, frontend, global_penalty, pcs, pc_penalties=None,
ccodes=[], lookup=[], ranking=[], details=SearchDetails()):
if pc_penalties is None:
pc_penalties = [0.0] * len(pcs)
search = PostcodeSearch(0.0, MySearchData())
+ api = frontend(apiobj, options=['search'])
+
async def run():
- async with apiobj.api._async_api.begin() as conn:
+ async with api._async_api.begin() as conn:
return await search.lookup(conn, details)
- return apiobj.async_to_sync(run())
+ return api._loop.run_until_complete(run())
-def test_postcode_only_search(apiobj):
+def test_postcode_only_search(apiobj, frontend):
apiobj.add_postcode(place_id=100, country_code='ch', postcode='12345')
apiobj.add_postcode(place_id=101, country_code='pl', postcode='12 345')
- results = run_search(apiobj, 0.3, ['12345', '12 345'], [0.0, 0.1])
+ results = run_search(apiobj, frontend, 0.3, ['12345', '12 345'], [0.0, 0.1])
assert len(results) == 2
assert [r.place_id for r in results] == [100, 101]
-def test_postcode_with_country(apiobj):
+def test_postcode_with_country(apiobj, frontend):
apiobj.add_postcode(place_id=100, country_code='ch', postcode='12345')
apiobj.add_postcode(place_id=101, country_code='pl', postcode='12 345')
- results = run_search(apiobj, 0.3, ['12345', '12 345'], [0.0, 0.1],
+ results = run_search(apiobj, frontend, 0.3, ['12345', '12 345'], [0.0, 0.1],
ccodes=['de', 'pl'])
assert len(results) == 1
country_code='pl')
- def test_lookup_both(self, apiobj):
+ def test_lookup_both(self, apiobj, frontend):
lookup = FieldLookup('name_vector', [1,2], 'restrict')
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
- results = run_search(apiobj, 0.1, ['12345'], lookup=[lookup], ranking=[ranking])
+ results = run_search(apiobj, frontend, 0.1, ['12345'], lookup=[lookup], ranking=[ranking])
assert [r.place_id for r in results] == [100, 101]
- def test_restrict_by_name(self, apiobj):
+ def test_restrict_by_name(self, apiobj, frontend):
lookup = FieldLookup('name_vector', [10], 'restrict')
- results = run_search(apiobj, 0.1, ['12345'], lookup=[lookup])
+ results = run_search(apiobj, frontend, 0.1, ['12345'], lookup=[lookup])
assert [r.place_id for r in results] == [100]
@pytest.mark.parametrize('coord,place_id', [((16.5, 5), 100),
((-45.1, 7.004), 101)])
- def test_lookup_near(self, apiobj, coord, place_id):
+ def test_lookup_near(self, apiobj, frontend, coord, place_id):
lookup = FieldLookup('name_vector', [1,2], 'restrict')
ranking = FieldRanking('name_vector', 0.3, [RankedTokens(0.0, [10])])
- results = run_search(apiobj, 0.1, ['12345'],
+ results = run_search(apiobj, frontend, 0.1, ['12345'],
lookup=[lookup], ranking=[ranking],
details=SearchDetails(near=napi.Point(*coord),
near_radius=0.6))
napi.GeometryFormat.KML,
napi.GeometryFormat.SVG,
napi.GeometryFormat.TEXT])
- def test_return_geometries(self, apiobj, geom):
- results = run_search(apiobj, 0.1, ['12345'],
+ def test_return_geometries(self, apiobj, frontend, geom):
+ results = run_search(apiobj, frontend, 0.1, ['12345'],
details=SearchDetails(geometry_output=geom))
assert results
@pytest.mark.parametrize('viewbox, rids', [('-46,6,-44,8', [101,100]),
('16,4,18,6', [100,101])])
- def test_prefer_viewbox(self, apiobj, viewbox, rids):
- results = run_search(apiobj, 0.1, ['12345'],
+ def test_prefer_viewbox(self, apiobj, frontend, viewbox, rids):
+ results = run_search(apiobj, frontend, 0.1, ['12345'],
details=SearchDetails.from_kwargs({'viewbox': viewbox}))
assert [r.place_id for r in results] == rids
@pytest.mark.parametrize('viewbox, rid', [('-46,6,-44,8', 101),
('16,4,18,6', 100)])
- def test_restrict_to_viewbox(self, apiobj, viewbox, rid):
- results = run_search(apiobj, 0.1, ['12345'],
+ def test_restrict_to_viewbox(self, apiobj, frontend, viewbox, rid):
+ results = run_search(apiobj, frontend, 0.1, ['12345'],
details=SearchDetails.from_kwargs({'viewbox': viewbox,
'bounded_viewbox': True}))
@pytest.mark.parametrize('coord,rids', [((17.05, 5), [100, 101]),
((-45, 7.1), [101, 100])])
- def test_prefer_near(self, apiobj, coord, rids):
- results = run_search(apiobj, 0.1, ['12345'],
+ def test_prefer_near(self, apiobj, frontend, coord, rids):
+ results = run_search(apiobj, frontend, 0.1, ['12345'],
details=SearchDetails(near=napi.Point(*coord)))
assert [r.place_id for r in results] == rids
@pytest.mark.parametrize('pid,rid', [(100, 101), (101, 100)])
- def test_exclude(self, apiobj, pid, rid):
- results = run_search(apiobj, 0.1, ['12345'],
+ def test_exclude(self, apiobj, frontend, pid, rid):
+ results = run_search(apiobj, frontend, 0.1, ['12345'],
details=SearchDetails(excluded=[pid]))
assert [r.place_id for r in results] == [rid]
import nominatim.api as napi
import nominatim.api.logging as loglib
+API_OPTIONS = {'search'}
+
@pytest.fixture(autouse=True)
def setup_icu_tokenizer(apiobj):
""" Setup the propoerties needed for using the ICU tokenizer.
])
-def test_search_no_content(apiobj, table_factory):
- table_factory('word',
- definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB')
+def test_search_no_content(apiobj, frontend):
+ apiobj.add_word_table([])
- assert apiobj.api.search('foo') == []
+ api = frontend(apiobj, options=API_OPTIONS)
+ assert api.search('foo') == []
-def test_search_simple_word(apiobj, table_factory):
- table_factory('word',
- definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB',
- content=[(55, 'test', 'W', 'test', None),
+def test_search_simple_word(apiobj, frontend):
+ apiobj.add_word_table([(55, 'test', 'W', 'test', None),
(2, 'test', 'w', 'test', None)])
apiobj.add_placex(place_id=444, class_='place', type='village',
centroid=(1.3, 0.7))
apiobj.add_search_name(444, names=[2, 55])
- results = apiobj.api.search('TEST')
+ api = frontend(apiobj, options=API_OPTIONS)
+ results = api.search('TEST')
assert [r.place_id for r in results] == [444]
@pytest.mark.parametrize('logtype', ['text', 'html'])
-def test_search_with_debug(apiobj, table_factory, logtype):
- table_factory('word',
- definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB',
- content=[(55, 'test', 'W', 'test', None),
+def test_search_with_debug(apiobj, frontend, logtype):
+ apiobj.add_word_table([(55, 'test', 'W', 'test', None),
(2, 'test', 'w', 'test', None)])
apiobj.add_placex(place_id=444, class_='place', type='village',
centroid=(1.3, 0.7))
apiobj.add_search_name(444, names=[2, 55])
+ api = frontend(apiobj, options=API_OPTIONS)
loglib.set_log_output(logtype)
- results = apiobj.api.search('TEST')
+ results = api.search('TEST')
assert loglib.get_and_disable()
-def test_address_no_content(apiobj, table_factory):
- table_factory('word',
- definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB')
+def test_address_no_content(apiobj, frontend):
+ apiobj.add_word_table([])
- assert apiobj.api.search_address(amenity='hotel',
- street='Main St 34',
- city='Happyville',
- county='Wideland',
- state='Praerie',
- postalcode='55648',
- country='xx') == []
+ api = frontend(apiobj, options=API_OPTIONS)
+ assert api.search_address(amenity='hotel',
+ street='Main St 34',
+ city='Happyville',
+ county='Wideland',
+ state='Praerie',
+ postalcode='55648',
+ country='xx') == []
@pytest.mark.parametrize('atype,address,search', [('street', 26, 26),
('city', 16, 18),
('county', 12, 12),
('state', 8, 8)])
-def test_address_simple_places(apiobj, table_factory, atype, address, search):
- table_factory('word',
- definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB',
- content=[(55, 'test', 'W', 'test', None),
+def test_address_simple_places(apiobj, frontend, atype, address, search):
+ apiobj.add_word_table([(55, 'test', 'W', 'test', None),
(2, 'test', 'w', 'test', None)])
apiobj.add_placex(place_id=444,
centroid=(1.3, 0.7))
apiobj.add_search_name(444, names=[2, 55], address_rank=address, search_rank=search)
- results = apiobj.api.search_address(**{atype: 'TEST'})
+ api = frontend(apiobj, options=API_OPTIONS)
+ results = api.search_address(**{atype: 'TEST'})
assert [r.place_id for r in results] == [444]
-def test_address_country(apiobj, table_factory):
- table_factory('word',
- definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB',
- content=[(None, 'ro', 'C', 'ro', None)])
+def test_address_country(apiobj, frontend):
+ apiobj.add_word_table([(None, 'ro', 'C', 'ro', None)])
apiobj.add_country('ro', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))')
apiobj.add_country_name('ro', {'name': 'România'})
- assert len(apiobj.api.search_address(country='ro')) == 1
+ api = frontend(apiobj, options=API_OPTIONS)
+ assert len(api.search_address(country='ro')) == 1
-def test_category_no_categories(apiobj, table_factory):
- table_factory('word',
- definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB')
+def test_category_no_categories(apiobj, frontend):
+ apiobj.add_word_table([])
- assert apiobj.api.search_category([], near_query='Berlin') == []
+ api = frontend(apiobj, options=API_OPTIONS)
+ assert api.search_category([], near_query='Berlin') == []
-def test_category_no_content(apiobj, table_factory):
- table_factory('word',
- definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB')
+def test_category_no_content(apiobj, frontend):
+ apiobj.add_word_table([])
- assert apiobj.api.search_category([('amenity', 'restaurant')]) == []
+ api = frontend(apiobj, options=API_OPTIONS)
+ assert api.search_category([('amenity', 'restaurant')]) == []
-def test_category_simple_restaurant(apiobj, table_factory):
- table_factory('word',
- definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB')
+def test_category_simple_restaurant(apiobj, frontend):
+ apiobj.add_word_table([])
apiobj.add_placex(place_id=444, class_='amenity', type='restaurant',
centroid=(1.3, 0.7))
apiobj.add_search_name(444, names=[2, 55], address_rank=16, search_rank=18)
- results = apiobj.api.search_category([('amenity', 'restaurant')],
- near=(1.3, 0.701), near_radius=0.015)
+ api = frontend(apiobj, options=API_OPTIONS)
+ results = api.search_category([('amenity', 'restaurant')],
+ near=(1.3, 0.701), near_radius=0.015)
assert [r.place_id for r in results] == [444]
-def test_category_with_search_phrase(apiobj, table_factory):
- table_factory('word',
- definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB',
- content=[(55, 'test', 'W', 'test', None),
+def test_category_with_search_phrase(apiobj, frontend):
+ apiobj.add_word_table([(55, 'test', 'W', 'test', None),
(2, 'test', 'w', 'test', None)])
apiobj.add_placex(place_id=444, class_='place', type='village',
apiobj.add_placex(place_id=95, class_='amenity', type='restaurant',
centroid=(1.3, 0.7003))
- results = apiobj.api.search_category([('amenity', 'restaurant')],
- near_query='TEST')
+ api = frontend(apiobj, options=API_OPTIONS)
+ results = api.search_category([('amenity', 'restaurant')], near_query='TEST')
assert [r.place_id for r in results] == [95]