* [Jinja2](https://palletsprojects.com/p/jinja/)
* [PyICU](https://pypi.org/project/PyICU/)
* [PyYaml](https://pyyaml.org/) (5.1+)
- * [datrie](https://github.com/pytries/datrie)
These will be installed automatically when using pip installation.
virtualenv ~/nominatim-dev-venv
~/nominatim-dev-venv/bin/pip install\
psutil psycopg[binary] PyICU SQLAlchemy \
- python-dotenv jinja2 pyYAML datrie behave \
+ python-dotenv jinja2 pyYAML behave \
mkdocs mkdocstrings mkdocs-gen-files pytest pytest-asyncio flake8 \
types-jinja2 types-markupsafe types-psutil types-psycopg2 \
types-pygments types-pyyaml types-requests types-ujson \
Thus, while you may split or join phrases, you should not reorder them
unless you really know what you are doing.
-Phrase types (`nominatim_api.search.PhraseType`) can further help narrowing
-down how the tokens in the phrase are interpreted. The following phrase types
-are known:
-
-::: nominatim_api.search.PhraseType
- options:
- heading_level: 6
+Phrase types can further help narrowing down how the tokens in the phrase
+are interpreted. The following phrase types are known:
+
+| Name | Description |
+|----------------|-------------|
+| PHRASE_ANY | No specific designation (i.e. source is free-form query) |
+| PHRASE_AMENITY | Contains name or type of a POI |
+| PHRASE_STREET | Contains a street name optionally with a housenumber |
+| PHRASE_CITY | Contains the postal city |
+| PHRASE_COUNTY | Contains the equivalent of a county |
+| PHRASE_STATE | Contains a state or province |
+| PHRASE_POSTCODE| Contains a postal code |
+| PHRASE_COUNTRY | Contains the country name or code |
## Custom sanitizer modules
"python-dotenv==1.0.1",
"jinja2==3.1.5",
"pyYAML==6.0.2",
- "datrie==0.8.2",
"psutil==7.0.0",
"PyICU==2.14",
"osmium==4.0.2",
from .status import get_status, StatusResult
from .lookup import get_places, get_detailed_place
from .reverse import ReverseGeocoder
-from .search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
+from . import search as nsearch
from . import types as ntyp
from .results import DetailedResult, ReverseResult, SearchResults
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if details.keywords:
- await make_query_analyzer(conn)
+ await nsearch.make_query_analyzer(conn)
return await get_detailed_place(conn, place, details)
async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if details.keywords:
- await make_query_analyzer(conn)
+ await nsearch.make_query_analyzer(conn)
return await get_places(conn, places, details)
async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if details.keywords:
- await make_query_analyzer(conn)
+ await nsearch.make_query_analyzer(conn)
geocoder = ReverseGeocoder(conn, details,
self.reverse_restrict_to_country_area)
return await geocoder.lookup(coord)
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
- geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
- self.config.get_int('REQUEST_TIMEOUT')
- if self.config.REQUEST_TIMEOUT else None)
- phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
+ geocoder = nsearch.ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
+ phrases = [nsearch.Phrase(nsearch.PHRASE_ANY, p.strip()) for p in query.split(',')]
return await geocoder.lookup(phrases)
async def search_address(self, amenity: Optional[str] = None,
conn.set_query_timeout(self.query_timeout)
details = ntyp.SearchDetails.from_kwargs(params)
- phrases: List[Phrase] = []
+ phrases: List[nsearch.Phrase] = []
if amenity:
- phrases.append(Phrase(PhraseType.AMENITY, amenity))
+ phrases.append(nsearch.Phrase(nsearch.PHRASE_AMENITY, amenity))
if street:
- phrases.append(Phrase(PhraseType.STREET, street))
+ phrases.append(nsearch.Phrase(nsearch.PHRASE_STREET, street))
if city:
- phrases.append(Phrase(PhraseType.CITY, city))
+ phrases.append(nsearch.Phrase(nsearch.PHRASE_CITY, city))
if county:
- phrases.append(Phrase(PhraseType.COUNTY, county))
+ phrases.append(nsearch.Phrase(nsearch.PHRASE_COUNTY, county))
if state:
- phrases.append(Phrase(PhraseType.STATE, state))
+ phrases.append(nsearch.Phrase(nsearch.PHRASE_STATE, state))
if postalcode:
- phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
+ phrases.append(nsearch.Phrase(nsearch.PHRASE_POSTCODE, postalcode))
if country:
- phrases.append(Phrase(PhraseType.COUNTRY, country))
+ phrases.append(nsearch.Phrase(nsearch.PHRASE_COUNTRY, country))
if not phrases:
raise UsageError('Nothing to search for.')
if amenity:
details.layers |= ntyp.DataLayer.POI
- geocoder = ForwardGeocoder(conn, details,
- self.config.get_int('REQUEST_TIMEOUT')
- if self.config.REQUEST_TIMEOUT else None)
+ geocoder = nsearch.ForwardGeocoder(conn, details,
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup(phrases)
async def search_category(self, categories: List[Tuple[str, str]],
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
if near_query:
- phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
+ phrases = [nsearch.Phrase(nsearch.PHRASE_ANY, p) for p in near_query.split(',')]
else:
phrases = []
if details.keywords:
- await make_query_analyzer(conn)
+ await nsearch.make_query_analyzer(conn)
- geocoder = ForwardGeocoder(conn, details,
- self.config.get_int('REQUEST_TIMEOUT')
- if self.config.REQUEST_TIMEOUT else None)
+ geocoder = nsearch.ForwardGeocoder(conn, details,
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup_pois(categories, phrases)
"""
from .geocoder import (ForwardGeocoder as ForwardGeocoder)
from .query import (Phrase as Phrase,
- PhraseType as PhraseType)
+ PHRASE_ANY as PHRASE_ANY,
+ PHRASE_AMENITY as PHRASE_AMENITY,
+ PHRASE_STREET as PHRASE_STREET,
+ PHRASE_CITY as PHRASE_CITY,
+ PHRASE_COUNTY as PHRASE_COUNTY,
+ PHRASE_STATE as PHRASE_STATE,
+ PHRASE_POSTCODE as PHRASE_POSTCODE,
+ PHRASE_COUNTRY as PHRASE_COUNTRY)
from .query_analyzer_factory import (make_query_analyzer as make_query_analyzer)
import heapq
from ..types import SearchDetails, DataLayer
-from .query import QueryStruct, Token, TokenType, TokenRange, BreakType
+from . import query as qmod
from .token_assignment import TokenAssignment
from . import db_search_fields as dbf
from . import db_searches as dbs
""" Build the abstract search queries from token assignments.
"""
- def __init__(self, query: QueryStruct, details: SearchDetails) -> None:
+ def __init__(self, query: qmod.QueryStruct, details: SearchDetails) -> None:
self.query = query
self.details = details
builder = self.build_poi_search(sdata)
elif assignment.housenumber:
hnr_tokens = self.query.get_tokens(assignment.housenumber,
- TokenType.HOUSENUMBER)
+ qmod.TOKEN_HOUSENUMBER)
builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
else:
builder = self.build_special_search(sdata, assignment.address,
yield dbs.PoiSearch(sdata)
def build_special_search(self, sdata: dbf.SearchData,
- address: List[TokenRange],
+ address: List[qmod.TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for searches that do not involve
a named place.
lookups.Restrict)]
yield dbs.PostcodeSearch(penalty, sdata)
- def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
- address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
+ def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[qmod.Token],
+ address: List[qmod.TokenRange]) -> Iterator[dbs.AbstractSearch]:
""" Build a simple address search for special entries where the
housenumber is the main name token.
"""
list(partials), lookups.LookupAll))
else:
addr_fulls = [t.token for t
- in self.query.get_tokens(address[0], TokenType.WORD)]
+ in self.query.get_tokens(address[0], qmod.TOKEN_WORD)]
if len(addr_fulls) > 5:
return
sdata.lookups.append(
yield dbs.PlaceSearch(0.05, sdata, expected_count)
def build_name_search(self, sdata: dbf.SearchData,
- name: TokenRange, address: List[TokenRange],
+ name: qmod.TokenRange, address: List[qmod.TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for simple name or address searches.
"""
sdata.lookups = lookup
yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
- def yield_lookups(self, name: TokenRange, address: List[TokenRange]
+ def yield_lookups(self, name: qmod.TokenRange, address: List[qmod.TokenRange]
) -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
""" Yield all variants how the given name and address should best
be searched for. This takes into account how frequent the terms
addr_count = min(t.addr_count for t in addr_partials) if addr_partials else 50000
# Partial term to frequent. Try looking up by rare full names first.
- name_fulls = self.query.get_tokens(name, TokenType.WORD)
+ name_fulls = self.query.get_tokens(name, qmod.TOKEN_WORD)
if name_fulls:
fulls_count = sum(t.count for t in name_fulls)
self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
def get_name_address_ranking(self, name_tokens: List[int],
- addr_partials: List[Token]) -> List[dbf.FieldLookup]:
+ addr_partials: List[qmod.Token]) -> List[dbf.FieldLookup]:
""" Create a ranking expression looking up by name and address.
"""
lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
return lookup
- def get_full_name_ranking(self, name_fulls: List[Token], addr_partials: List[Token],
+ def get_full_name_ranking(self, name_fulls: List[qmod.Token], addr_partials: List[qmod.Token],
use_lookup: bool) -> List[dbf.FieldLookup]:
""" Create a ranking expression with full name terms and
additional address lookup. When 'use_lookup' is true, then
return dbf.lookup_by_any_name([t.token for t in name_fulls],
addr_restrict_tokens, addr_lookup_tokens)
- def get_name_ranking(self, trange: TokenRange,
+ def get_name_ranking(self, trange: qmod.TokenRange,
db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
"""
- name_fulls = self.query.get_tokens(trange, TokenType.WORD)
+ name_fulls = self.query.get_tokens(trange, qmod.TOKEN_WORD)
ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
ranks.sort(key=lambda r: r.penalty)
# Fallback, sum of penalty for partials
default = sum(t.penalty for t in name_partials) + 0.2
return dbf.FieldRanking(db_field, default, ranks)
- def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
+ def get_addr_ranking(self, trange: qmod.TokenRange) -> dbf.FieldRanking:
""" Create a list of ranking expressions for an address term
for the given ranges.
"""
while todo:
neglen, pos, rank = heapq.heappop(todo)
for tlist in self.query.nodes[pos].starting:
- if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
+ if tlist.ttype in (qmod.TOKEN_PARTIAL, qmod.TOKEN_WORD):
if tlist.end < trange.end:
chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype]
- if tlist.ttype == TokenType.PARTIAL:
+ if tlist.ttype == qmod.TOKEN_PARTIAL:
penalty = rank.penalty + chgpenalty \
+ max(t.penalty for t in tlist.tokens)
heapq.heappush(todo, (neglen - 1, tlist.end,
heapq.heappush(todo, (neglen - 1, tlist.end,
rank.with_token(t, chgpenalty)))
elif tlist.end == trange.end:
- if tlist.ttype == TokenType.PARTIAL:
+ if tlist.ttype == qmod.TOKEN_PARTIAL:
ranks.append(dbf.RankedTokens(rank.penalty
+ max(t.penalty for t in tlist.tokens),
rank.tokens))
if assignment.housenumber:
sdata.set_strings('housenumbers',
self.query.get_tokens(assignment.housenumber,
- TokenType.HOUSENUMBER))
+ qmod.TOKEN_HOUSENUMBER))
if assignment.postcode:
sdata.set_strings('postcodes',
self.query.get_tokens(assignment.postcode,
- TokenType.POSTCODE))
+ qmod.TOKEN_POSTCODE))
if assignment.qualifier:
tokens = self.get_qualifier_tokens(assignment.qualifier)
if not tokens:
return sdata
- def get_country_tokens(self, trange: TokenRange) -> List[Token]:
+ def get_country_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]:
""" Return the list of country tokens for the given range,
optionally filtered by the country list from the details
parameters.
"""
- tokens = self.query.get_tokens(trange, TokenType.COUNTRY)
+ tokens = self.query.get_tokens(trange, qmod.TOKEN_COUNTRY)
if self.details.countries:
tokens = [t for t in tokens if t.lookup_word in self.details.countries]
return tokens
- def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]:
+ def get_qualifier_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]:
""" Return the list of qualifier tokens for the given range,
optionally filtered by the qualifier list from the details
parameters.
"""
- tokens = self.query.get_tokens(trange, TokenType.QUALIFIER)
+ tokens = self.query.get_tokens(trange, qmod.TOKEN_QUALIFIER)
if self.details.categories:
tokens = [t for t in tokens if t.get_category() in self.details.categories]
"""
if assignment.near_item:
tokens: Dict[Tuple[str, str], float] = {}
- for t in self.query.get_tokens(assignment.near_item, TokenType.NEAR_ITEM):
+ for t in self.query.get_tokens(assignment.near_item, qmod.TOKEN_NEAR_ITEM):
cat = t.get_category()
# The category of a near search will be that of near_item.
# Thus, if search is restricted to a category parameter,
PENALTY_WORDCHANGE = {
- BreakType.START: 0.0,
- BreakType.END: 0.0,
- BreakType.PHRASE: 0.0,
- BreakType.SOFT_PHRASE: 0.0,
- BreakType.WORD: 0.1,
- BreakType.PART: 0.2,
- BreakType.TOKEN: 0.4
+ qmod.BREAK_START: 0.0,
+ qmod.BREAK_END: 0.0,
+ qmod.BREAK_PHRASE: 0.0,
+ qmod.BREAK_SOFT_PHRASE: 0.0,
+ qmod.BREAK_WORD: 0.1,
+ qmod.BREAK_PART: 0.2,
+ qmod.BREAK_TOKEN: 0.4
}
DB_TO_TOKEN_TYPE = {
- 'W': qmod.TokenType.WORD,
- 'w': qmod.TokenType.PARTIAL,
- 'H': qmod.TokenType.HOUSENUMBER,
- 'P': qmod.TokenType.POSTCODE,
- 'C': qmod.TokenType.COUNTRY
+ 'W': qmod.TOKEN_WORD,
+ 'w': qmod.TOKEN_PARTIAL,
+ 'H': qmod.TOKEN_HOUSENUMBER,
+ 'P': qmod.TOKEN_POSTCODE,
+ 'C': qmod.TOKEN_COUNTRY
}
PENALTY_IN_TOKEN_BREAK = {
- qmod.BreakType.START: 0.5,
- qmod.BreakType.END: 0.5,
- qmod.BreakType.PHRASE: 0.5,
- qmod.BreakType.SOFT_PHRASE: 0.5,
- qmod.BreakType.WORD: 0.1,
- qmod.BreakType.PART: 0.0,
- qmod.BreakType.TOKEN: 0.0
+ qmod.BREAK_START: 0.5,
+ qmod.BREAK_END: 0.5,
+ qmod.BREAK_PHRASE: 0.5,
+ qmod.BREAK_SOFT_PHRASE: 0.5,
+ qmod.BREAK_WORD: 0.1,
+ qmod.BREAK_PART: 0.0,
+ qmod.BREAK_TOKEN: 0.0
}
@dataclasses.dataclass
class QueryPart:
""" Normalized and transliterated form of a single term in the query.
+
When the term came out of a split during the transliteration,
the normalized string is the full word before transliteration.
- The word number keeps track of the word before transliteration
- and can be used to identify partial transliterated terms.
+ Check the subsequent break type to figure out if the word is
+ continued.
+
Penalty is the break penalty for the break following the token.
"""
token: str
normalized: str
- word_number: int
penalty: float
WordDict = Dict[str, List[qmod.TokenRange]]
-def yield_words(terms: List[QueryPart], start: int) -> Iterator[Tuple[str, qmod.TokenRange]]:
- """ Return all combinations of words in the terms list after the
- given position.
+def extract_words(terms: List[QueryPart], start: int, words: WordDict) -> None:
+ """ Add all combinations of words in the terms list after the
+ given position to the word list.
"""
total = len(terms)
+ base_penalty = PENALTY_IN_TOKEN_BREAK[qmod.BREAK_WORD]
for first in range(start, total):
word = terms[first].token
- penalty = PENALTY_IN_TOKEN_BREAK[qmod.BreakType.WORD]
- yield word, qmod.TokenRange(first, first + 1, penalty=penalty)
+ penalty = base_penalty
+ words[word].append(qmod.TokenRange(first, first + 1, penalty=penalty))
for last in range(first + 1, min(first + 20, total)):
word = ' '.join((word, terms[last].token))
penalty += terms[last - 1].penalty
- yield word, qmod.TokenRange(first, last + 1, penalty=penalty)
+ words[word].append(qmod.TokenRange(first, last + 1, penalty=penalty))
@dataclasses.dataclass
if row.type == 'S':
if row.info['op'] in ('in', 'near'):
if trange.start == 0:
- query.add_token(trange, qmod.TokenType.NEAR_ITEM, token)
+ query.add_token(trange, qmod.TOKEN_NEAR_ITEM, token)
else:
if trange.start == 0 and trange.end == query.num_token_slots():
- query.add_token(trange, qmod.TokenType.NEAR_ITEM, token)
+ query.add_token(trange, qmod.TOKEN_NEAR_ITEM, token)
else:
- query.add_token(trange, qmod.TokenType.QUALIFIER, token)
+ query.add_token(trange, qmod.TOKEN_QUALIFIER, token)
else:
query.add_token(trange, DB_TO_TOKEN_TYPE[row.type], token)
"""
parts: QueryParts = []
phrase_start = 0
- words = defaultdict(list)
- wordnr = 0
+ words: WordDict = defaultdict(list)
for phrase in query.source:
query.nodes[-1].ptype = phrase.ptype
phrase_split = re.split('([ :-])', phrase.text)
if trans:
for term in trans.split(' '):
if term:
- parts.append(QueryPart(term, word, wordnr,
- PENALTY_IN_TOKEN_BREAK[qmod.BreakType.TOKEN]))
- query.add_node(qmod.BreakType.TOKEN, phrase.ptype)
- query.nodes[-1].btype = qmod.BreakType(breakchar)
- parts[-1].penalty = PENALTY_IN_TOKEN_BREAK[qmod.BreakType(breakchar)]
- wordnr += 1
+ parts.append(QueryPart(term, word,
+ PENALTY_IN_TOKEN_BREAK[qmod.BREAK_TOKEN]))
+ query.add_node(qmod.BREAK_TOKEN, phrase.ptype)
+ query.nodes[-1].btype = breakchar
+ parts[-1].penalty = PENALTY_IN_TOKEN_BREAK[breakchar]
- for word, wrange in yield_words(parts, phrase_start):
- words[word].append(wrange)
+ extract_words(parts, phrase_start, words)
phrase_start = len(parts)
- query.nodes[-1].btype = qmod.BreakType.END
+ query.nodes[-1].btype = qmod.BREAK_END
return parts, words
"""
for part, node, i in zip(parts, query.nodes, range(1000)):
if len(part.token) <= 4 and part.token.isdigit()\
- and not node.has_tokens(i+1, qmod.TokenType.HOUSENUMBER):
- query.add_token(qmod.TokenRange(i, i+1), qmod.TokenType.HOUSENUMBER,
+ and not node.has_tokens(i+1, qmod.TOKEN_HOUSENUMBER):
+ query.add_token(qmod.TokenRange(i, i+1), qmod.TOKEN_HOUSENUMBER,
ICUToken(penalty=0.5, token=0,
count=1, addr_count=1, lookup_word=part.token,
word_token=part.token, info=None))
""" Add penalties to tokens that depend on presence of other token.
"""
for i, node, tlist in query.iter_token_lists():
- if tlist.ttype == qmod.TokenType.POSTCODE:
+ if tlist.ttype == qmod.TOKEN_POSTCODE:
for repl in node.starting:
- if repl.end == tlist.end and repl.ttype != qmod.TokenType.POSTCODE \
- and (repl.ttype != qmod.TokenType.HOUSENUMBER
+ if repl.end == tlist.end and repl.ttype != qmod.TOKEN_POSTCODE \
+ and (repl.ttype != qmod.TOKEN_HOUSENUMBER
or len(tlist.tokens[0].lookup_word) > 4):
repl.add_penalty(0.39)
- elif (tlist.ttype == qmod.TokenType.HOUSENUMBER
+ elif (tlist.ttype == qmod.TOKEN_HOUSENUMBER
and len(tlist.tokens[0].lookup_word) <= 3):
if any(c.isdigit() for c in tlist.tokens[0].lookup_word):
for repl in node.starting:
- if repl.end == tlist.end and repl.ttype != qmod.TokenType.HOUSENUMBER:
+ if repl.end == tlist.end and repl.ttype != qmod.TOKEN_HOUSENUMBER:
repl.add_penalty(0.5 - tlist.tokens[0].penalty)
- elif tlist.ttype not in (qmod.TokenType.COUNTRY, qmod.TokenType.PARTIAL):
+ elif tlist.ttype not in (qmod.TOKEN_COUNTRY, qmod.TOKEN_PARTIAL):
norm = parts[i].normalized
for j in range(i + 1, tlist.end):
- if parts[j - 1].word_number != parts[j].word_number:
+ if node.btype != qmod.BREAK_TOKEN:
norm += ' ' + parts[j].normalized
for token in tlist.tokens:
cast(ICUToken, token).rematch(norm)
def _dump_transliterated(query: qmod.QueryStruct, parts: QueryParts) -> str:
- out = query.nodes[0].btype.value
+ out = query.nodes[0].btype
for node, part in zip(query.nodes[1:], parts):
- out += part.token + node.btype.value
+ out += part.token + node.btype
return out
for tlist in node.starting:
for token in tlist.tokens:
t = cast(ICUToken, token)
- yield [tlist.ttype.name, t.token, t.word_token or '',
+ yield [tlist.ttype, t.token, t.word_token or '',
t.lookup_word or '', t.penalty, t.count, t.info]
from typing import List, Tuple, Optional, Iterator
from abc import ABC, abstractmethod
import dataclasses
-import enum
-class BreakType(enum.Enum):
- """ Type of break between tokens.
- """
- START = '<'
- """ Begin of the query. """
- END = '>'
- """ End of the query. """
- PHRASE = ','
- """ Hard break between two phrases. Address parts cannot cross hard
- phrase boundaries."""
- SOFT_PHRASE = ':'
- """ Likely break between two phrases. Address parts should not cross soft
- phrase boundaries. Soft breaks can be inserted by a preprocessor
- that is analysing the input string.
- """
- WORD = ' '
- """ Break between words. """
- PART = '-'
- """ Break inside a word, for example a hyphen or apostrophe. """
- TOKEN = '`'
- """ Break created as a result of tokenization.
- This may happen in languages without spaces between words.
- """
+BreakType = str
+""" Type of break between tokens.
+"""
+BREAK_START = '<'
+""" Begin of the query. """
+BREAK_END = '>'
+""" End of the query. """
+BREAK_PHRASE = ','
+""" Hard break between two phrases. Address parts cannot cross hard
+ phrase boundaries."""
+BREAK_SOFT_PHRASE = ':'
+""" Likely break between two phrases. Address parts should not cross soft
+ phrase boundaries. Soft breaks can be inserted by a preprocessor
+ that is analysing the input string.
+"""
+BREAK_WORD = ' '
+""" Break between words. """
+BREAK_PART = '-'
+""" Break inside a word, for example a hyphen or apostrophe. """
+BREAK_TOKEN = '`'
+""" Break created as a result of tokenization.
+ This may happen in languages without spaces between words.
+"""
-class TokenType(enum.Enum):
- """ Type of token.
- """
- WORD = enum.auto()
- """ Full name of a place. """
- PARTIAL = enum.auto()
- """ Word term without breaks, does not necessarily represent a full name. """
- HOUSENUMBER = enum.auto()
- """ Housenumber term. """
- POSTCODE = enum.auto()
- """ Postal code term. """
- COUNTRY = enum.auto()
- """ Country name or reference. """
- QUALIFIER = enum.auto()
- """ Special term used together with name (e.g. _Hotel_ Bellevue). """
- NEAR_ITEM = enum.auto()
- """ Special term used as searchable object(e.g. supermarket in ...). """
-
-
-class PhraseType(enum.Enum):
- """ Designation of a phrase.
+TokenType = str
+""" Type of token.
+"""
+TOKEN_WORD = 'W'
+""" Full name of a place. """
+TOKEN_PARTIAL = 'w'
+""" Word term without breaks, does not necessarily represent a full name. """
+TOKEN_HOUSENUMBER = 'H'
+""" Housenumber term. """
+TOKEN_POSTCODE = 'P'
+""" Postal code term. """
+TOKEN_COUNTRY = 'C'
+""" Country name or reference. """
+TOKEN_QUALIFIER = 'Q'
+""" Special term used together with name (e.g. _Hotel_ Bellevue). """
+TOKEN_NEAR_ITEM = 'N'
+""" Special term used as searchable object(e.g. supermarket in ...). """
+
+
+PhraseType = int
+""" Designation of a phrase.
+"""
+PHRASE_ANY = 0
+""" No specific designation (i.e. source is free-form query). """
+PHRASE_AMENITY = 1
+""" Contains name or type of a POI. """
+PHRASE_STREET = 2
+""" Contains a street name optionally with a housenumber. """
+PHRASE_CITY = 3
+""" Contains the postal city. """
+PHRASE_COUNTY = 4
+""" Contains the equivalent of a county. """
+PHRASE_STATE = 5
+""" Contains a state or province. """
+PHRASE_POSTCODE = 6
+""" Contains a postal code. """
+PHRASE_COUNTRY = 7
+""" Contains the country name or code. """
+
+
+def _phrase_compatible_with(ptype: PhraseType, ttype: TokenType,
+ is_full_phrase: bool) -> bool:
+ """ Check if the given token type can be used with the phrase type.
"""
- NONE = 0
- """ No specific designation (i.e. source is free-form query). """
- AMENITY = enum.auto()
- """ Contains name or type of a POI. """
- STREET = enum.auto()
- """ Contains a street name optionally with a housenumber. """
- CITY = enum.auto()
- """ Contains the postal city. """
- COUNTY = enum.auto()
- """ Contains the equivalent of a county. """
- STATE = enum.auto()
- """ Contains a state or province. """
- POSTCODE = enum.auto()
- """ Contains a postal code. """
- COUNTRY = enum.auto()
- """ Contains the country name or code. """
-
- def compatible_with(self, ttype: TokenType,
- is_full_phrase: bool) -> bool:
- """ Check if the given token type can be used with the phrase type.
- """
- if self == PhraseType.NONE:
- return not is_full_phrase or ttype != TokenType.QUALIFIER
- if self == PhraseType.AMENITY:
- return ttype in (TokenType.WORD, TokenType.PARTIAL)\
- or (is_full_phrase and ttype == TokenType.NEAR_ITEM)\
- or (not is_full_phrase and ttype == TokenType.QUALIFIER)
- if self == PhraseType.STREET:
- return ttype in (TokenType.WORD, TokenType.PARTIAL, TokenType.HOUSENUMBER)
- if self == PhraseType.POSTCODE:
- return ttype == TokenType.POSTCODE
- if self == PhraseType.COUNTRY:
- return ttype == TokenType.COUNTRY
-
- return ttype in (TokenType.WORD, TokenType.PARTIAL)
+ if ptype == PHRASE_ANY:
+ return not is_full_phrase or ttype != TOKEN_QUALIFIER
+ if ptype == PHRASE_AMENITY:
+ return ttype in (TOKEN_WORD, TOKEN_PARTIAL)\
+ or (is_full_phrase and ttype == TOKEN_NEAR_ITEM)\
+ or (not is_full_phrase and ttype == TOKEN_QUALIFIER)
+ if ptype == PHRASE_STREET:
+ return ttype in (TOKEN_WORD, TOKEN_PARTIAL, TOKEN_HOUSENUMBER)
+ if ptype == PHRASE_POSTCODE:
+ return ttype == TOKEN_POSTCODE
+ if ptype == PHRASE_COUNTRY:
+ return ttype == TOKEN_COUNTRY
+
+ return ttype in (TOKEN_WORD, TOKEN_PARTIAL)
@dataclasses.dataclass
def __init__(self, source: List[Phrase]) -> None:
self.source = source
self.nodes: List[QueryNode] = \
- [QueryNode(BreakType.START, source[0].ptype if source else PhraseType.NONE)]
+ [QueryNode(BREAK_START, source[0].ptype if source else PHRASE_ANY)]
def num_token_slots(self) -> int:
""" Return the length of the query in vertice steps.
be added to, then the token is silently dropped.
"""
snode = self.nodes[trange.start]
- full_phrase = snode.btype in (BreakType.START, BreakType.PHRASE)\
- and self.nodes[trange.end].btype in (BreakType.PHRASE, BreakType.END)
- if snode.ptype.compatible_with(ttype, full_phrase):
+ full_phrase = snode.btype in (BREAK_START, BREAK_PHRASE)\
+ and self.nodes[trange.end].btype in (BREAK_PHRASE, BREAK_END)
+ if _phrase_compatible_with(snode.ptype, ttype, full_phrase):
tlist = snode.get_tokens(trange.end, ttype)
if tlist is None:
snode.starting.append(TokenList(trange.end, ttype, [token]))
going to the subsequent node. Such PARTIAL tokens are
assumed to exist.
"""
- return [next(iter(self.get_tokens(TokenRange(i, i+1), TokenType.PARTIAL)))
+ return [next(iter(self.get_tokens(TokenRange(i, i+1), TOKEN_PARTIAL)))
for i in range(trange.start, trange.end)]
def iter_token_lists(self) -> Iterator[Tuple[int, QueryNode, TokenList]]:
for tlist in node.starting:
for t in tlist.tokens:
if t.token == token:
- return f"[{tlist.ttype.name[0]}]{t.lookup_word}"
+ return f"[{tlist.ttype}]{t.lookup_word}"
return 'None'
PENALTY_TOKENCHANGE = {
- qmod.BreakType.START: 0.0,
- qmod.BreakType.END: 0.0,
- qmod.BreakType.PHRASE: 0.0,
- qmod.BreakType.SOFT_PHRASE: 0.0,
- qmod.BreakType.WORD: 0.1,
- qmod.BreakType.PART: 0.2,
- qmod.BreakType.TOKEN: 0.4
+ qmod.BREAK_START: 0.0,
+ qmod.BREAK_END: 0.0,
+ qmod.BREAK_PHRASE: 0.0,
+ qmod.BREAK_SOFT_PHRASE: 0.0,
+ qmod.BREAK_WORD: 0.1,
+ qmod.BREAK_PART: 0.2,
+ qmod.BREAK_TOKEN: 0.4
}
TypedRangeSeq = List[TypedRange]
"""
out = TokenAssignment()
for token in ranges:
- if token.ttype == qmod.TokenType.PARTIAL:
+ if token.ttype == qmod.TOKEN_PARTIAL:
out.address.append(token.trange)
- elif token.ttype == qmod.TokenType.HOUSENUMBER:
+ elif token.ttype == qmod.TOKEN_HOUSENUMBER:
out.housenumber = token.trange
- elif token.ttype == qmod.TokenType.POSTCODE:
+ elif token.ttype == qmod.TOKEN_POSTCODE:
out.postcode = token.trange
- elif token.ttype == qmod.TokenType.COUNTRY:
+ elif token.ttype == qmod.TOKEN_COUNTRY:
out.country = token.trange
- elif token.ttype == qmod.TokenType.NEAR_ITEM:
+ elif token.ttype == qmod.TOKEN_NEAR_ITEM:
out.near_item = token.trange
- elif token.ttype == qmod.TokenType.QUALIFIER:
+ elif token.ttype == qmod.TOKEN_QUALIFIER:
out.qualifier = token.trange
return out
self.penalty = penalty
def __str__(self) -> str:
- seq = ''.join(f'[{r.trange.start} - {r.trange.end}: {r.ttype.name}]' for r in self.seq)
+ seq = ''.join(f'[{r.trange.start} - {r.trange.end}: {r.ttype}]' for r in self.seq)
return f'{seq} (dir: {self.direction}, penalty: {self.penalty})'
@property
"""
# Country and category must be the final term for left-to-right
return len(self.seq) > 1 and \
- self.seq[-1].ttype in (qmod.TokenType.COUNTRY, qmod.TokenType.NEAR_ITEM)
+ self.seq[-1].ttype in (qmod.TOKEN_COUNTRY, qmod.TOKEN_NEAR_ITEM)
def appendable(self, ttype: qmod.TokenType) -> Optional[int]:
""" Check if the give token type is appendable to the existing sequence.
new direction of the sequence after adding such a type. The
token is not added.
"""
- if ttype == qmod.TokenType.WORD:
+ if ttype == qmod.TOKEN_WORD:
return None
if not self.seq:
# Append unconditionally to the empty list
- if ttype == qmod.TokenType.COUNTRY:
+ if ttype == qmod.TOKEN_COUNTRY:
return -1
- if ttype in (qmod.TokenType.HOUSENUMBER, qmod.TokenType.QUALIFIER):
+ if ttype in (qmod.TOKEN_HOUSENUMBER, qmod.TOKEN_QUALIFIER):
return 1
return self.direction
# Name tokens are always acceptable and don't change direction
- if ttype == qmod.TokenType.PARTIAL:
+ if ttype == qmod.TOKEN_PARTIAL:
# qualifiers cannot appear in the middle of the query. They need
# to be near the next phrase.
if self.direction == -1 \
- and any(t.ttype == qmod.TokenType.QUALIFIER for t in self.seq[:-1]):
+ and any(t.ttype == qmod.TOKEN_QUALIFIER for t in self.seq[:-1]):
return None
return self.direction
if self.has_types(ttype):
return None
- if ttype == qmod.TokenType.HOUSENUMBER:
+ if ttype == qmod.TOKEN_HOUSENUMBER:
if self.direction == 1:
- if len(self.seq) == 1 and self.seq[0].ttype == qmod.TokenType.QUALIFIER:
+ if len(self.seq) == 1 and self.seq[0].ttype == qmod.TOKEN_QUALIFIER:
return None
if len(self.seq) > 2 \
- or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY):
+ or self.has_types(qmod.TOKEN_POSTCODE, qmod.TOKEN_COUNTRY):
return None # direction left-to-right: housenumber must come before anything
elif (self.direction == -1
- or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY)):
+ or self.has_types(qmod.TOKEN_POSTCODE, qmod.TOKEN_COUNTRY)):
return -1 # force direction right-to-left if after other terms
return self.direction
- if ttype == qmod.TokenType.POSTCODE:
+ if ttype == qmod.TOKEN_POSTCODE:
if self.direction == -1:
- if self.has_types(qmod.TokenType.HOUSENUMBER, qmod.TokenType.QUALIFIER):
+ if self.has_types(qmod.TOKEN_HOUSENUMBER, qmod.TOKEN_QUALIFIER):
return None
return -1
if self.direction == 1:
- return None if self.has_types(qmod.TokenType.COUNTRY) else 1
- if self.has_types(qmod.TokenType.HOUSENUMBER, qmod.TokenType.QUALIFIER):
+ return None if self.has_types(qmod.TOKEN_COUNTRY) else 1
+ if self.has_types(qmod.TOKEN_HOUSENUMBER, qmod.TOKEN_QUALIFIER):
return 1
return self.direction
- if ttype == qmod.TokenType.COUNTRY:
+ if ttype == qmod.TOKEN_COUNTRY:
return None if self.direction == -1 else 1
- if ttype == qmod.TokenType.NEAR_ITEM:
+ if ttype == qmod.TOKEN_NEAR_ITEM:
return self.direction
- if ttype == qmod.TokenType.QUALIFIER:
+ if ttype == qmod.TOKEN_QUALIFIER:
if self.direction == 1:
if (len(self.seq) == 1
- and self.seq[0].ttype in (qmod.TokenType.PARTIAL, qmod.TokenType.NEAR_ITEM)) \
+ and self.seq[0].ttype in (qmod.TOKEN_PARTIAL, qmod.TOKEN_NEAR_ITEM)) \
or (len(self.seq) == 2
- and self.seq[0].ttype == qmod.TokenType.NEAR_ITEM
- and self.seq[1].ttype == qmod.TokenType.PARTIAL):
+ and self.seq[0].ttype == qmod.TOKEN_NEAR_ITEM
+ and self.seq[1].ttype == qmod.TOKEN_PARTIAL):
return 1
return None
if self.direction == -1:
return -1
- tempseq = self.seq[1:] if self.seq[0].ttype == qmod.TokenType.NEAR_ITEM else self.seq
+ tempseq = self.seq[1:] if self.seq[0].ttype == qmod.TOKEN_NEAR_ITEM else self.seq
if len(tempseq) == 0:
return 1
- if len(tempseq) == 1 and self.seq[0].ttype == qmod.TokenType.HOUSENUMBER:
+ if len(tempseq) == 1 and self.seq[0].ttype == qmod.TOKEN_HOUSENUMBER:
return None
- if len(tempseq) > 1 or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY):
+ if len(tempseq) > 1 or self.has_types(qmod.TOKEN_POSTCODE, qmod.TOKEN_COUNTRY):
return -1
return 0
new_penalty = 0.0
else:
last = self.seq[-1]
- if btype != qmod.BreakType.PHRASE and last.ttype == ttype:
+ if btype != qmod.BREAK_PHRASE and last.ttype == ttype:
# extend the existing range
newseq = self.seq[:-1] + [TypedRange(ttype, last.trange.replace_end(end_pos))]
new_penalty = 0.0
# housenumbers may not be further than 2 words from the beginning.
# If there are two words in front, give it a penalty.
hnrpos = next((i for i, tr in enumerate(self.seq)
- if tr.ttype == qmod.TokenType.HOUSENUMBER),
+ if tr.ttype == qmod.TOKEN_HOUSENUMBER),
None)
if hnrpos is not None:
if self.direction != -1:
- priors = sum(1 for t in self.seq[:hnrpos] if t.ttype == qmod.TokenType.PARTIAL)
+ priors = sum(1 for t in self.seq[:hnrpos] if t.ttype == qmod.TOKEN_PARTIAL)
if not self._adapt_penalty_from_priors(priors, -1):
return False
if self.direction != 1:
- priors = sum(1 for t in self.seq[hnrpos+1:] if t.ttype == qmod.TokenType.PARTIAL)
+ priors = sum(1 for t in self.seq[hnrpos+1:] if t.ttype == qmod.TOKEN_PARTIAL)
if not self._adapt_penalty_from_priors(priors, 1):
return False
- if any(t.ttype == qmod.TokenType.NEAR_ITEM for t in self.seq):
+ if any(t.ttype == qmod.TOKEN_NEAR_ITEM for t in self.seq):
self.penalty += 1.0
return True
# * the containing phrase is strictly typed
if (base.housenumber and first.end < base.housenumber.start)\
or (base.qualifier and base.qualifier > first)\
- or (query.nodes[first.start].ptype != qmod.PhraseType.NONE):
+ or (query.nodes[first.start].ptype != qmod.PHRASE_ANY):
return
penalty = self.penalty
# * the containing phrase is strictly typed
if (base.housenumber and last.start > base.housenumber.end)\
or (base.qualifier and base.qualifier < last)\
- or (query.nodes[last.start].ptype != qmod.PhraseType.NONE):
+ or (query.nodes[last.start].ptype != qmod.PHRASE_ANY):
return
penalty = self.penalty
another. It does not include penalties for transitions within a
type.
"""
- todo = [_TokenSequence([], direction=0 if query.source[0].ptype == qmod.PhraseType.NONE else 1)]
+ todo = [_TokenSequence([], direction=0 if query.source[0].ptype == qmod.PHRASE_ANY else 1)]
while todo:
state = todo.pop()
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2024 by the Nominatim developer community.
+# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Generic processor for names that creates abbreviation variants.
from typing import Mapping, Dict, Any, Iterable, Iterator, Optional, List, cast
import itertools
-import datrie
-
from ...errors import UsageError
from ...data.place_name import PlaceName
from .config_variants import get_variant_config
from .generic_mutation import MutationVariantGenerator
+from .simple_trie import SimpleTrie
# Configuration section
"""
config: Dict[str, Any] = {}
- config['replacements'], config['chars'] = get_variant_config(rules.get('variants'),
- normalizer)
+ config['replacements'], _ = get_variant_config(rules.get('variants'), normalizer)
config['variant_only'] = rules.get('mode', '') == 'variant-only'
# parse mutation rules
self.variant_only = config['variant_only']
# Set up datrie
- if config['replacements']:
- self.replacements = datrie.Trie(config['chars'])
- for src, repllist in config['replacements']:
- self.replacements[src] = repllist
- else:
- self.replacements = None
+ self.replacements: Optional[SimpleTrie[List[str]]] = \
+ SimpleTrie(config['replacements']) if config['replacements'] else None
# set up mutation rules
self.mutations = [MutationVariantGenerator(*cfg) for cfg in config['mutations']]
pos = 0
force_space = False
while pos < baselen:
- full, repl = self.replacements.longest_prefix_item(baseform[pos:],
- (None, None))
- if full is not None:
- done = baseform[startpos:pos]
+ frm = pos
+ repl, pos = self.replacements.longest_prefix(baseform, pos)
+ if repl is not None:
+ done = baseform[startpos:frm]
partials = [v + done + r
for v, r in itertools.product(partials, repl)
if not force_space or r.startswith(' ')]
# to be helpful. Only use the original term.
startpos = 0
break
- startpos = pos + len(full)
- if full[-1] == ' ':
- startpos -= 1
+ if baseform[pos - 1] == ' ':
+ pos -= 1
force_space = True
- pos = startpos
+ startpos = pos
else:
pos += 1
force_space = False
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2025 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Simple dict-based implementation of a trie structure.
+"""
+from typing import TypeVar, Generic, Tuple, Optional, List, Dict
+from collections import defaultdict
+
+T = TypeVar('T')
+
+
+class SimpleTrie(Generic[T]):
+ """ A simple read-only trie structure.
+ This structure supports examply one lookup operation,
+ which is longest-prefix lookup.
+ """
+
+ def __init__(self, data: Optional[List[Tuple[str, T]]] = None) -> None:
+ self._tree: Dict[str, 'SimpleTrie[T]'] = defaultdict(SimpleTrie[T])
+ self._value: Optional[T] = None
+ self._prefix = ''
+
+ if data:
+ for key, value in data:
+ self._add(key, 0, value)
+
+ self._make_compact()
+
+ def _add(self, word: str, pos: int, value: T) -> None:
+ """ (Internal) Add a sub-word to the trie.
+ The word is added from index 'pos'. If the sub-word to add
+ is empty, then the trie saves the given value.
+ """
+ if pos < len(word):
+ self._tree[word[pos]]._add(word, pos + 1, value)
+ else:
+ self._value = value
+
+ def _make_compact(self) -> None:
+ """ (Internal) Compress tree where there is exactly one subtree
+ and no value.
+
+ Compression works recursively starting at the leaf.
+ """
+ for t in self._tree.values():
+ t._make_compact()
+
+ if len(self._tree) == 1 and self._value is None:
+ assert not self._prefix
+ for k, v in self._tree.items():
+ self._prefix = k + v._prefix
+ self._tree = v._tree
+ self._value = v._value
+
+ def longest_prefix(self, word: str, start: int = 0) -> Tuple[Optional[T], int]:
+ """ Return the longest prefix match for the given word starting at
+ the position 'start'.
+
+ The function returns a tuple with the value for the longest match and
+ the position of the word after the match. If no match was found at
+ all, the function returns (None, start).
+ """
+ cur = self
+ pos = start
+ result: Tuple[Optional[T], int] = None, start
+
+ while True:
+ if cur._prefix:
+ if not word.startswith(cur._prefix, pos):
+ return result
+ pos += len(cur._prefix)
+
+ if cur._value:
+ result = cur._value, pos
+
+ if pos >= len(word) or word[pos] not in cur._tree:
+ return result
+
+ cur = cur._tree[word[pos]]
+ pos += 1
def test_normalize_simple():
norm = ':: lower();'
- query = [qmod.Phrase(qmod.PhraseType.NONE, 'Hallo')]
+ query = [qmod.Phrase(qmod.PHRASE_ANY, 'Hallo')]
out = run_preprocessor_on(query, norm)
assert len(out) == 1
- assert out == [qmod.Phrase(qmod.PhraseType.NONE, 'hallo')]
+ assert out == [qmod.Phrase(qmod.PHRASE_ANY, 'hallo')]
('大阪府大阪', '大阪府:大阪'),
('大阪市大阪', '大阪市:大阪')])
def test_split_phrases(inp, outp):
- query = [qmod.Phrase(qmod.PhraseType.NONE, inp)]
+ query = [qmod.Phrase(qmod.PHRASE_ANY, inp)]
out = run_preprocessor_on(query)
- assert out == [qmod.Phrase(qmod.PhraseType.NONE, outp)]
+ assert out == [qmod.Phrase(qmod.PHRASE_ANY, outp)]
lookup_word='foo')
-@pytest.mark.parametrize('ptype,ttype', [('NONE', 'WORD'),
- ('AMENITY', 'QUALIFIER'),
- ('STREET', 'PARTIAL'),
- ('CITY', 'WORD'),
- ('COUNTRY', 'COUNTRY'),
- ('POSTCODE', 'POSTCODE')])
+@pytest.mark.parametrize('ptype,ttype', [(query.PHRASE_ANY, 'W'),
+ (query.PHRASE_AMENITY, 'Q'),
+ (query.PHRASE_STREET, 'w'),
+ (query.PHRASE_CITY, 'W'),
+ (query.PHRASE_COUNTRY, 'C'),
+ (query.PHRASE_POSTCODE, 'P')])
def test_phrase_compatible(ptype, ttype):
- assert query.PhraseType[ptype].compatible_with(query.TokenType[ttype], False)
+ assert query._phrase_compatible_with(ptype, ttype, False)
-@pytest.mark.parametrize('ptype', ['COUNTRY', 'POSTCODE'])
+@pytest.mark.parametrize('ptype', [query.PHRASE_COUNTRY, query.PHRASE_POSTCODE])
def test_phrase_incompatible(ptype):
- assert not query.PhraseType[ptype].compatible_with(query.TokenType.PARTIAL, True)
+ assert not query._phrase_compatible_with(ptype, query.TOKEN_PARTIAL, True)
def test_query_node_empty():
- qn = query.QueryNode(query.BreakType.PHRASE, query.PhraseType.NONE)
+ qn = query.QueryNode(query.BREAK_PHRASE, query.PHRASE_ANY)
- assert not qn.has_tokens(3, query.TokenType.PARTIAL)
- assert qn.get_tokens(3, query.TokenType.WORD) is None
+ assert not qn.has_tokens(3, query.TOKEN_PARTIAL)
+ assert qn.get_tokens(3, query.TOKEN_WORD) is None
def test_query_node_with_content():
- qn = query.QueryNode(query.BreakType.PHRASE, query.PhraseType.NONE)
- qn.starting.append(query.TokenList(2, query.TokenType.PARTIAL, [mktoken(100), mktoken(101)]))
- qn.starting.append(query.TokenList(2, query.TokenType.WORD, [mktoken(1000)]))
+ qn = query.QueryNode(query.BREAK_PHRASE, query.PHRASE_ANY)
+ qn.starting.append(query.TokenList(2, query.TOKEN_PARTIAL, [mktoken(100), mktoken(101)]))
+ qn.starting.append(query.TokenList(2, query.TOKEN_WORD, [mktoken(1000)]))
- assert not qn.has_tokens(3, query.TokenType.PARTIAL)
- assert not qn.has_tokens(2, query.TokenType.COUNTRY)
- assert qn.has_tokens(2, query.TokenType.PARTIAL)
- assert qn.has_tokens(2, query.TokenType.WORD)
+ assert not qn.has_tokens(3, query.TOKEN_PARTIAL)
+ assert not qn.has_tokens(2, query.TOKEN_COUNTRY)
+ assert qn.has_tokens(2, query.TOKEN_PARTIAL)
+ assert qn.has_tokens(2, query.TOKEN_WORD)
- assert qn.get_tokens(3, query.TokenType.PARTIAL) is None
- assert qn.get_tokens(2, query.TokenType.COUNTRY) is None
- assert len(qn.get_tokens(2, query.TokenType.PARTIAL)) == 2
- assert len(qn.get_tokens(2, query.TokenType.WORD)) == 1
+ assert qn.get_tokens(3, query.TOKEN_PARTIAL) is None
+ assert qn.get_tokens(2, query.TOKEN_COUNTRY) is None
+ assert len(qn.get_tokens(2, query.TOKEN_PARTIAL)) == 2
+ assert len(qn.get_tokens(2, query.TOKEN_WORD)) == 1
def test_query_struct_empty():
def test_query_struct_with_tokens():
- q = query.QueryStruct([query.Phrase(query.PhraseType.NONE, 'foo bar')])
- q.add_node(query.BreakType.WORD, query.PhraseType.NONE)
- q.add_node(query.BreakType.END, query.PhraseType.NONE)
+ q = query.QueryStruct([query.Phrase(query.PHRASE_ANY, 'foo bar')])
+ q.add_node(query.BREAK_WORD, query.PHRASE_ANY)
+ q.add_node(query.BREAK_END, query.PHRASE_ANY)
assert q.num_token_slots() == 2
- q.add_token(query.TokenRange(0, 1), query.TokenType.PARTIAL, mktoken(1))
- q.add_token(query.TokenRange(1, 2), query.TokenType.PARTIAL, mktoken(2))
- q.add_token(query.TokenRange(1, 2), query.TokenType.WORD, mktoken(99))
- q.add_token(query.TokenRange(1, 2), query.TokenType.WORD, mktoken(98))
+ q.add_token(query.TokenRange(0, 1), query.TOKEN_PARTIAL, mktoken(1))
+ q.add_token(query.TokenRange(1, 2), query.TOKEN_PARTIAL, mktoken(2))
+ q.add_token(query.TokenRange(1, 2), query.TOKEN_WORD, mktoken(99))
+ q.add_token(query.TokenRange(1, 2), query.TOKEN_WORD, mktoken(98))
- assert q.get_tokens(query.TokenRange(0, 2), query.TokenType.WORD) == []
- assert len(q.get_tokens(query.TokenRange(1, 2), query.TokenType.WORD)) == 2
+ assert q.get_tokens(query.TokenRange(0, 2), query.TOKEN_WORD) == []
+ assert len(q.get_tokens(query.TokenRange(1, 2), query.TOKEN_WORD)) == 2
partials = q.get_partials_list(query.TokenRange(0, 2))
def test_query_struct_incompatible_token():
- q = query.QueryStruct([query.Phrase(query.PhraseType.COUNTRY, 'foo bar')])
- q.add_node(query.BreakType.WORD, query.PhraseType.COUNTRY)
- q.add_node(query.BreakType.END, query.PhraseType.NONE)
+ q = query.QueryStruct([query.Phrase(query.PHRASE_COUNTRY, 'foo bar')])
+ q.add_node(query.BREAK_WORD, query.PHRASE_COUNTRY)
+ q.add_node(query.BREAK_END, query.PHRASE_ANY)
- q.add_token(query.TokenRange(0, 1), query.TokenType.PARTIAL, mktoken(1))
- q.add_token(query.TokenRange(1, 2), query.TokenType.COUNTRY, mktoken(100))
+ q.add_token(query.TokenRange(0, 1), query.TOKEN_PARTIAL, mktoken(1))
+ q.add_token(query.TokenRange(1, 2), query.TOKEN_COUNTRY, mktoken(100))
- assert q.get_tokens(query.TokenRange(0, 1), query.TokenType.PARTIAL) == []
- assert len(q.get_tokens(query.TokenRange(1, 2), query.TokenType.COUNTRY)) == 1
+ assert q.get_tokens(query.TokenRange(0, 1), query.TOKEN_PARTIAL) == []
+ assert len(q.get_tokens(query.TokenRange(1, 2), query.TOKEN_COUNTRY)) == 1
def test_query_struct_amenity_single_word():
- q = query.QueryStruct([query.Phrase(query.PhraseType.AMENITY, 'bar')])
- q.add_node(query.BreakType.END, query.PhraseType.NONE)
+ q = query.QueryStruct([query.Phrase(query.PHRASE_AMENITY, 'bar')])
+ q.add_node(query.BREAK_END, query.PHRASE_ANY)
- q.add_token(query.TokenRange(0, 1), query.TokenType.PARTIAL, mktoken(1))
- q.add_token(query.TokenRange(0, 1), query.TokenType.NEAR_ITEM, mktoken(2))
- q.add_token(query.TokenRange(0, 1), query.TokenType.QUALIFIER, mktoken(3))
+ q.add_token(query.TokenRange(0, 1), query.TOKEN_PARTIAL, mktoken(1))
+ q.add_token(query.TokenRange(0, 1), query.TOKEN_NEAR_ITEM, mktoken(2))
+ q.add_token(query.TokenRange(0, 1), query.TOKEN_QUALIFIER, mktoken(3))
- assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.PARTIAL)) == 1
- assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.NEAR_ITEM)) == 1
- assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.QUALIFIER)) == 0
+ assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_PARTIAL)) == 1
+ assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_NEAR_ITEM)) == 1
+ assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_QUALIFIER)) == 0
def test_query_struct_amenity_two_words():
- q = query.QueryStruct([query.Phrase(query.PhraseType.AMENITY, 'foo bar')])
- q.add_node(query.BreakType.WORD, query.PhraseType.AMENITY)
- q.add_node(query.BreakType.END, query.PhraseType.NONE)
+ q = query.QueryStruct([query.Phrase(query.PHRASE_AMENITY, 'foo bar')])
+ q.add_node(query.BREAK_WORD, query.PHRASE_AMENITY)
+ q.add_node(query.BREAK_END, query.PHRASE_ANY)
for trange in [(0, 1), (1, 2)]:
- q.add_token(query.TokenRange(*trange), query.TokenType.PARTIAL, mktoken(1))
- q.add_token(query.TokenRange(*trange), query.TokenType.NEAR_ITEM, mktoken(2))
- q.add_token(query.TokenRange(*trange), query.TokenType.QUALIFIER, mktoken(3))
+ q.add_token(query.TokenRange(*trange), query.TOKEN_PARTIAL, mktoken(1))
+ q.add_token(query.TokenRange(*trange), query.TOKEN_NEAR_ITEM, mktoken(2))
+ q.add_token(query.TokenRange(*trange), query.TOKEN_QUALIFIER, mktoken(3))
- assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.PARTIAL)) == 1
- assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.NEAR_ITEM)) == 0
- assert len(q.get_tokens(query.TokenRange(0, 1), query.TokenType.QUALIFIER)) == 1
+ assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_PARTIAL)) == 1
+ assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_NEAR_ITEM)) == 0
+ assert len(q.get_tokens(query.TokenRange(0, 1), query.TOKEN_QUALIFIER)) == 1
- assert len(q.get_tokens(query.TokenRange(1, 2), query.TokenType.PARTIAL)) == 1
- assert len(q.get_tokens(query.TokenRange(1, 2), query.TokenType.NEAR_ITEM)) == 0
- assert len(q.get_tokens(query.TokenRange(1, 2), query.TokenType.QUALIFIER)) == 1
+ assert len(q.get_tokens(query.TokenRange(1, 2), query.TOKEN_PARTIAL)) == 1
+ assert len(q.get_tokens(query.TokenRange(1, 2), query.TOKEN_NEAR_ITEM)) == 0
+ assert len(q.get_tokens(query.TokenRange(1, 2), query.TOKEN_QUALIFIER)) == 1
"""
import pytest
-from nominatim_api.search.query import Token, TokenRange, BreakType, PhraseType, TokenType, QueryStruct, Phrase
+from nominatim_api.search.query import Token, TokenRange, QueryStruct, Phrase
+import nominatim_api.search.query as qmod
from nominatim_api.search.db_search_builder import SearchBuilder
from nominatim_api.search.token_assignment import TokenAssignment
from nominatim_api.types import SearchDetails
def make_query(*args):
- q = QueryStruct([Phrase(PhraseType.NONE, '')])
+ q = QueryStruct([Phrase(qmod.PHRASE_ANY, '')])
for _ in range(max(inner[0] for tlist in args for inner in tlist)):
- q.add_node(BreakType.WORD, PhraseType.NONE)
- q.add_node(BreakType.END, PhraseType.NONE)
+ q.add_node(qmod.BREAK_WORD, qmod.PHRASE_ANY)
+ q.add_node(qmod.BREAK_END, qmod.PHRASE_ANY)
for start, tlist in enumerate(args):
for end, ttype, tinfo in tlist:
for tid, word in tinfo:
q.add_token(TokenRange(start, end), ttype,
- MyToken(penalty=0.5 if ttype == TokenType.PARTIAL else 0.0,
+ MyToken(penalty=0.5 if ttype == qmod.TOKEN_PARTIAL else 0.0,
token=tid, count=1, addr_count=1,
lookup_word=word))
def test_country_search():
- q = make_query([(1, TokenType.COUNTRY, [(2, 'de'), (3, 'en')])])
+ q = make_query([(1, qmod.TOKEN_COUNTRY, [(2, 'de'), (3, 'en')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(country=TokenRange(0, 1))))
def test_country_search_with_country_restriction():
- q = make_query([(1, TokenType.COUNTRY, [(2, 'de'), (3, 'en')])])
+ q = make_query([(1, qmod.TOKEN_COUNTRY, [(2, 'de'), (3, 'en')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'countries': 'en,fr'}))
searches = list(builder.build(TokenAssignment(country=TokenRange(0, 1))))
def test_country_search_with_conflicting_country_restriction():
- q = make_query([(1, TokenType.COUNTRY, [(2, 'de'), (3, 'en')])])
+ q = make_query([(1, qmod.TOKEN_COUNTRY, [(2, 'de'), (3, 'en')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'countries': 'fr'}))
searches = list(builder.build(TokenAssignment(country=TokenRange(0, 1))))
def test_postcode_search_simple():
- q = make_query([(1, TokenType.POSTCODE, [(34, '2367')])])
+ q = make_query([(1, qmod.TOKEN_POSTCODE, [(34, '2367')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(postcode=TokenRange(0, 1))))
def test_postcode_with_country():
- q = make_query([(1, TokenType.POSTCODE, [(34, '2367')])],
- [(2, TokenType.COUNTRY, [(1, 'xx')])])
+ q = make_query([(1, qmod.TOKEN_POSTCODE, [(34, '2367')])],
+ [(2, qmod.TOKEN_COUNTRY, [(1, 'xx')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(postcode=TokenRange(0, 1),
def test_postcode_with_address():
- q = make_query([(1, TokenType.POSTCODE, [(34, '2367')])],
- [(2, TokenType.PARTIAL, [(100, 'word')])])
+ q = make_query([(1, qmod.TOKEN_POSTCODE, [(34, '2367')])],
+ [(2, qmod.TOKEN_PARTIAL, [(100, 'word')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(postcode=TokenRange(0, 1),
def test_postcode_with_address_with_full_word():
- q = make_query([(1, TokenType.POSTCODE, [(34, '2367')])],
- [(2, TokenType.PARTIAL, [(100, 'word')]),
- (2, TokenType.WORD, [(1, 'full')])])
+ q = make_query([(1, qmod.TOKEN_POSTCODE, [(34, '2367')])],
+ [(2, qmod.TOKEN_PARTIAL, [(100, 'word')]),
+ (2, qmod.TOKEN_WORD, [(1, 'full')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(postcode=TokenRange(0, 1),
@pytest.mark.parametrize('kwargs', [{'viewbox': '0,0,1,1', 'bounded_viewbox': True},
{'near': '10,10'}])
def test_near_item_only(kwargs):
- q = make_query([(1, TokenType.NEAR_ITEM, [(2, 'foo')])])
+ q = make_query([(1, qmod.TOKEN_NEAR_ITEM, [(2, 'foo')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs(kwargs))
searches = list(builder.build(TokenAssignment(near_item=TokenRange(0, 1))))
@pytest.mark.parametrize('kwargs', [{'viewbox': '0,0,1,1'},
{}])
def test_near_item_skipped(kwargs):
- q = make_query([(1, TokenType.NEAR_ITEM, [(2, 'foo')])])
+ q = make_query([(1, qmod.TOKEN_NEAR_ITEM, [(2, 'foo')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs(kwargs))
searches = list(builder.build(TokenAssignment(near_item=TokenRange(0, 1))))
def test_name_only_search():
- q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
- (1, TokenType.WORD, [(100, 'a')])])
+ q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (1, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(name=TokenRange(0, 1))))
def test_name_with_qualifier():
- q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
- (1, TokenType.WORD, [(100, 'a')])],
- [(2, TokenType.QUALIFIER, [(55, 'hotel')])])
+ q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (1, qmod.TOKEN_WORD, [(100, 'a')])],
+ [(2, qmod.TOKEN_QUALIFIER, [(55, 'hotel')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(name=TokenRange(0, 1),
def test_name_with_housenumber_search():
- q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
- (1, TokenType.WORD, [(100, 'a')])],
- [(2, TokenType.HOUSENUMBER, [(66, '66')])])
+ q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (1, qmod.TOKEN_WORD, [(100, 'a')])],
+ [(2, qmod.TOKEN_HOUSENUMBER, [(66, '66')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(name=TokenRange(0, 1),
def test_name_and_address():
- q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
- (1, TokenType.WORD, [(100, 'a')])],
- [(2, TokenType.PARTIAL, [(2, 'b')]),
- (2, TokenType.WORD, [(101, 'b')])],
- [(3, TokenType.PARTIAL, [(3, 'c')]),
- (3, TokenType.WORD, [(102, 'c')])]
+ q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (1, qmod.TOKEN_WORD, [(100, 'a')])],
+ [(2, qmod.TOKEN_PARTIAL, [(2, 'b')]),
+ (2, qmod.TOKEN_WORD, [(101, 'b')])],
+ [(3, qmod.TOKEN_PARTIAL, [(3, 'c')]),
+ (3, qmod.TOKEN_WORD, [(102, 'c')])]
)
builder = SearchBuilder(q, SearchDetails())
def test_name_and_complex_address():
- q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
- (1, TokenType.WORD, [(100, 'a')])],
- [(2, TokenType.PARTIAL, [(2, 'b')]),
- (3, TokenType.WORD, [(101, 'bc')])],
- [(3, TokenType.PARTIAL, [(3, 'c')])],
- [(4, TokenType.PARTIAL, [(4, 'd')]),
- (4, TokenType.WORD, [(103, 'd')])]
+ q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (1, qmod.TOKEN_WORD, [(100, 'a')])],
+ [(2, qmod.TOKEN_PARTIAL, [(2, 'b')]),
+ (3, qmod.TOKEN_WORD, [(101, 'bc')])],
+ [(3, qmod.TOKEN_PARTIAL, [(3, 'c')])],
+ [(4, qmod.TOKEN_PARTIAL, [(4, 'd')]),
+ (4, qmod.TOKEN_WORD, [(103, 'd')])]
)
builder = SearchBuilder(q, SearchDetails())
def test_name_only_near_search():
- q = make_query([(1, TokenType.NEAR_ITEM, [(88, 'g')])],
- [(2, TokenType.PARTIAL, [(1, 'a')]),
- (2, TokenType.WORD, [(100, 'a')])])
+ q = make_query([(1, qmod.TOKEN_NEAR_ITEM, [(88, 'g')])],
+ [(2, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (2, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails())
searches = list(builder.build(TokenAssignment(name=TokenRange(1, 2),
def test_name_only_search_with_category():
- q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
- (1, TokenType.WORD, [(100, 'a')])])
+ q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (1, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'categories': [('foo', 'bar')]}))
searches = list(builder.build(TokenAssignment(name=TokenRange(0, 1))))
def test_name_with_near_item_search_with_category_mismatch():
- q = make_query([(1, TokenType.NEAR_ITEM, [(88, 'g')])],
- [(2, TokenType.PARTIAL, [(1, 'a')]),
- (2, TokenType.WORD, [(100, 'a')])])
+ q = make_query([(1, qmod.TOKEN_NEAR_ITEM, [(88, 'g')])],
+ [(2, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (2, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'categories': [('foo', 'bar')]}))
searches = list(builder.build(TokenAssignment(name=TokenRange(1, 2),
def test_name_with_near_item_search_with_category_match():
- q = make_query([(1, TokenType.NEAR_ITEM, [(88, 'g')])],
- [(2, TokenType.PARTIAL, [(1, 'a')]),
- (2, TokenType.WORD, [(100, 'a')])])
+ q = make_query([(1, qmod.TOKEN_NEAR_ITEM, [(88, 'g')])],
+ [(2, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (2, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'categories': [('foo', 'bar'),
('this', 'that')]}))
def test_name_with_qualifier_search_with_category_mismatch():
- q = make_query([(1, TokenType.QUALIFIER, [(88, 'g')])],
- [(2, TokenType.PARTIAL, [(1, 'a')]),
- (2, TokenType.WORD, [(100, 'a')])])
+ q = make_query([(1, qmod.TOKEN_QUALIFIER, [(88, 'g')])],
+ [(2, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (2, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'categories': [('foo', 'bar')]}))
searches = list(builder.build(TokenAssignment(name=TokenRange(1, 2),
def test_name_with_qualifier_search_with_category_match():
- q = make_query([(1, TokenType.QUALIFIER, [(88, 'g')])],
- [(2, TokenType.PARTIAL, [(1, 'a')]),
- (2, TokenType.WORD, [(100, 'a')])])
+ q = make_query([(1, qmod.TOKEN_QUALIFIER, [(88, 'g')])],
+ [(2, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (2, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'categories': [('foo', 'bar'),
('this', 'that')]}))
def test_name_only_search_with_countries():
- q = make_query([(1, TokenType.PARTIAL, [(1, 'a')]),
- (1, TokenType.WORD, [(100, 'a')])])
+ q = make_query([(1, qmod.TOKEN_PARTIAL, [(1, 'a')]),
+ (1, qmod.TOKEN_WORD, [(100, 'a')])])
builder = SearchBuilder(q, SearchDetails.from_kwargs({'countries': 'de,en'}))
searches = list(builder.build(TokenAssignment(name=TokenRange(0, 1))))
def make_counted_searches(name_part, name_full, address_part, address_full,
num_address_parts=1):
- q = QueryStruct([Phrase(PhraseType.NONE, '')])
+ q = QueryStruct([Phrase(qmod.PHRASE_ANY, '')])
for i in range(1 + num_address_parts):
- q.add_node(BreakType.WORD, PhraseType.NONE)
- q.add_node(BreakType.END, PhraseType.NONE)
+ q.add_node(qmod.BREAK_WORD, qmod.PHRASE_ANY)
+ q.add_node(qmod.BREAK_END, qmod.PHRASE_ANY)
- q.add_token(TokenRange(0, 1), TokenType.PARTIAL,
+ q.add_token(TokenRange(0, 1), qmod.TOKEN_PARTIAL,
MyToken(0.5, 1, name_part, 1, 'name_part'))
- q.add_token(TokenRange(0, 1), TokenType.WORD,
+ q.add_token(TokenRange(0, 1), qmod.TOKEN_WORD,
MyToken(0, 101, name_full, 1, 'name_full'))
for i in range(num_address_parts):
- q.add_token(TokenRange(i + 1, i + 2), TokenType.PARTIAL,
+ q.add_token(TokenRange(i + 1, i + 2), qmod.TOKEN_PARTIAL,
MyToken(0.5, 2, address_part, 1, 'address_part'))
- q.add_token(TokenRange(i + 1, i + 2), TokenType.WORD,
+ q.add_token(TokenRange(i + 1, i + 2), qmod.TOKEN_WORD,
MyToken(0, 102, address_full, 1, 'address_full'))
builder = SearchBuilder(q, SearchDetails())
import pytest_asyncio
from nominatim_api import NominatimAPIAsync
-from nominatim_api.search.query import Phrase, PhraseType, TokenType, BreakType
+from nominatim_api.search.query import Phrase
+import nominatim_api.search.query as qmod
import nominatim_api.search.icu_tokenizer as tok
from nominatim_api.logging import set_log_output, get_and_disable
def make_phrase(query):
- return [Phrase(PhraseType.NONE, s) for s in query.split(',')]
+ return [Phrase(qmod.PHRASE_ANY, s) for s in query.split(',')]
@pytest_asyncio.fixture
async def conn(table_factory):
query = await ana.analyze_query(make_phrase('foo BAR'))
assert len(query.source) == 1
- assert query.source[0].ptype == PhraseType.NONE
+ assert query.source[0].ptype == qmod.PHRASE_ANY
assert query.source[0].text == 'foo bar'
assert query.num_token_slots() == 2
assert query.num_token_slots() == 2
assert query.nodes[0].starting
assert query.nodes[1].starting
- assert query.nodes[1].btype == BreakType.TOKEN
+ assert query.nodes[1].btype == qmod.BREAK_TOKEN
@pytest.mark.asyncio
-@pytest.mark.parametrize('term,order', [('23456', ['POSTCODE', 'HOUSENUMBER', 'WORD', 'PARTIAL']),
- ('3', ['HOUSENUMBER', 'POSTCODE', 'WORD', 'PARTIAL'])
+@pytest.mark.parametrize('term,order', [('23456', ['P', 'H', 'W', 'w']),
+ ('3', ['H', 'P', 'W', 'w'])
])
async def test_penalty_postcodes_and_housenumbers(conn, term, order):
ana = await tok.create_query_analyzer(conn)
assert query.num_token_slots() == 1
- torder = [(tl.tokens[0].penalty, tl.ttype.name) for tl in query.nodes[0].starting]
+ torder = [(tl.tokens[0].penalty, tl.ttype) for tl in query.nodes[0].starting]
torder.sort()
assert [t[1] for t in torder] == order
assert query.num_token_slots() == 3
assert len(query.nodes[0].starting) == 1
- assert query.nodes[0].starting[0].ttype == TokenType.NEAR_ITEM
+ assert query.nodes[0].starting[0].ttype == qmod.TOKEN_NEAR_ITEM
assert not query.nodes[2].starting
assert query.num_token_slots() == 1
assert len(query.nodes[0].starting) == 1
- assert query.nodes[0].starting[0].ttype == TokenType.NEAR_ITEM
+ assert query.nodes[0].starting[0].ttype == qmod.TOKEN_NEAR_ITEM
@pytest.mark.asyncio
query = await ana.analyze_query(make_phrase('foo BAR foo BAR foo'))
assert query.num_token_slots() == 5
- assert set(t.ttype for t in query.nodes[0].starting) == {TokenType.QUALIFIER}
- assert set(t.ttype for t in query.nodes[2].starting) == {TokenType.QUALIFIER}
- assert set(t.ttype for t in query.nodes[4].starting) == {TokenType.QUALIFIER}
+ assert set(t.ttype for t in query.nodes[0].starting) == {qmod.TOKEN_QUALIFIER}
+ assert set(t.ttype for t in query.nodes[2].starting) == {qmod.TOKEN_QUALIFIER}
+ assert set(t.ttype for t in query.nodes[4].starting) == {qmod.TOKEN_QUALIFIER}
@pytest.mark.asyncio
query = await ana.analyze_query(make_phrase('466 23 99834 34a'))
assert query.num_token_slots() == 4
- assert query.nodes[0].starting[0].ttype == TokenType.HOUSENUMBER
+ assert query.nodes[0].starting[0].ttype == qmod.TOKEN_HOUSENUMBER
assert len(query.nodes[0].starting[0].tokens) == 1
assert query.nodes[0].starting[0].tokens[0].token == 0
- assert query.nodes[1].starting[0].ttype == TokenType.HOUSENUMBER
+ assert query.nodes[1].starting[0].ttype == qmod.TOKEN_HOUSENUMBER
assert len(query.nodes[1].starting[0].tokens) == 1
assert query.nodes[1].starting[0].tokens[0].token == 1
assert not query.nodes[2].starting
"""
import pytest
-from nominatim_api.search.query import QueryStruct, Phrase, PhraseType, BreakType, TokenType, TokenRange, Token
+from nominatim_api.search.query import QueryStruct, Phrase, TokenRange, Token
+import nominatim_api.search.query as qmod
from nominatim_api.search.token_assignment import yield_token_assignments, TokenAssignment, PENALTY_TOKENCHANGE
class MyToken(Token):
for btype, ptype, _ in args[1:]:
q.add_node(btype, ptype)
- q.add_node(BreakType.END, PhraseType.NONE)
+ q.add_node(qmod.BREAK_END, qmod.PHRASE_ANY)
for start, t in enumerate(args):
for end, ttype in t[2]:
def test_query_with_missing_tokens():
- q = QueryStruct([Phrase(PhraseType.NONE, '')])
- q.add_node(BreakType.END, PhraseType.NONE)
+ q = QueryStruct([Phrase(qmod.PHRASE_ANY, '')])
+ q.add_node(qmod.BREAK_END, qmod.PHRASE_ANY)
assert list(yield_token_assignments(q)) == []
def test_one_word_query():
- q = make_query((BreakType.START, PhraseType.NONE,
- [(1, TokenType.PARTIAL),
- (1, TokenType.WORD),
- (1, TokenType.HOUSENUMBER)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY,
+ [(1, qmod.TOKEN_PARTIAL),
+ (1, qmod.TOKEN_WORD),
+ (1, qmod.TOKEN_HOUSENUMBER)]))
res = list(yield_token_assignments(q))
assert res == [TokenAssignment(name=TokenRange(0, 1))]
def test_single_postcode():
- q = make_query((BreakType.START, PhraseType.NONE,
- [(1, TokenType.POSTCODE)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY,
+ [(1, qmod.TOKEN_POSTCODE)]))
res = list(yield_token_assignments(q))
assert res == [TokenAssignment(postcode=TokenRange(0, 1))]
def test_single_country_name():
- q = make_query((BreakType.START, PhraseType.NONE,
- [(1, TokenType.COUNTRY)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY,
+ [(1, qmod.TOKEN_COUNTRY)]))
res = list(yield_token_assignments(q))
assert res == [TokenAssignment(country=TokenRange(0, 1))]
def test_single_word_poi_search():
- q = make_query((BreakType.START, PhraseType.NONE,
- [(1, TokenType.NEAR_ITEM),
- (1, TokenType.QUALIFIER)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY,
+ [(1, qmod.TOKEN_NEAR_ITEM),
+ (1, qmod.TOKEN_QUALIFIER)]))
res = list(yield_token_assignments(q))
assert res == [TokenAssignment(near_item=TokenRange(0, 1))]
-@pytest.mark.parametrize('btype', [BreakType.WORD, BreakType.PART, BreakType.TOKEN])
+@pytest.mark.parametrize('btype', [qmod.BREAK_WORD, qmod.BREAK_PART, qmod.BREAK_TOKEN])
def test_multiple_simple_words(btype):
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (btype, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
- (btype, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (btype, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
+ (btype, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
penalty = PENALTY_TOKENCHANGE[btype]
def test_multiple_words_respect_phrase_break():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(name=TokenRange(0, 1),
def test_housenumber_and_street():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.HOUSENUMBER)]),
- (BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_HOUSENUMBER)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(name=TokenRange(1, 2),
def test_housenumber_and_street_backwards():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.HOUSENUMBER)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_HOUSENUMBER)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(name=TokenRange(0, 1),
def test_housenumber_and_postcode():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.HOUSENUMBER)]),
- (BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(4, TokenType.POSTCODE)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_HOUSENUMBER)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(4, qmod.TOKEN_POSTCODE)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=pytest.approx(0.3),
postcode=TokenRange(3, 4)))
def test_postcode_and_housenumber():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.POSTCODE)]),
- (BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(4, TokenType.HOUSENUMBER)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_POSTCODE)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(4, qmod.TOKEN_HOUSENUMBER)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=pytest.approx(0.3),
def test_country_housenumber_postcode():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.COUNTRY)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(3, TokenType.HOUSENUMBER)]),
- (BreakType.WORD, PhraseType.NONE, [(4, TokenType.POSTCODE)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_COUNTRY)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_HOUSENUMBER)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(4, qmod.TOKEN_POSTCODE)]))
check_assignments(yield_token_assignments(q))
-@pytest.mark.parametrize('ttype', [TokenType.POSTCODE, TokenType.COUNTRY,
- TokenType.NEAR_ITEM, TokenType.QUALIFIER])
+@pytest.mark.parametrize('ttype', [qmod.TOKEN_POSTCODE, qmod.TOKEN_COUNTRY,
+ qmod.TOKEN_NEAR_ITEM, qmod.TOKEN_QUALIFIER])
def test_housenumber_with_only_special_terms(ttype):
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.HOUSENUMBER)]),
- (BreakType.WORD, PhraseType.NONE, [(2, ttype)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_HOUSENUMBER)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, ttype)]))
check_assignments(yield_token_assignments(q))
-@pytest.mark.parametrize('ttype', [TokenType.POSTCODE, TokenType.HOUSENUMBER, TokenType.COUNTRY])
+@pytest.mark.parametrize('ttype', [qmod.TOKEN_POSTCODE, qmod.TOKEN_HOUSENUMBER, qmod.TOKEN_COUNTRY])
def test_multiple_special_tokens(ttype):
- q = make_query((BreakType.START, PhraseType.NONE, [(1, ttype)]),
- (BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
- (BreakType.PHRASE, PhraseType.NONE, [(3, ttype)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, ttype)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(3, ttype)]))
check_assignments(yield_token_assignments(q))
def test_housenumber_many_phrases():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
- (BreakType.PHRASE, PhraseType.NONE, [(3, TokenType.PARTIAL)]),
- (BreakType.PHRASE, PhraseType.NONE, [(4, TokenType.HOUSENUMBER)]),
- (BreakType.WORD, PhraseType.NONE, [(5, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(4, qmod.TOKEN_HOUSENUMBER)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(5, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1,
def test_country_at_beginning():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.COUNTRY)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_COUNTRY)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1, name=TokenRange(1, 2),
def test_country_at_end():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.COUNTRY)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_COUNTRY)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1, name=TokenRange(0, 1),
def test_country_in_middle():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.COUNTRY)]),
- (BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_COUNTRY)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q))
def test_postcode_with_designation():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.POSTCODE)]),
- (BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_POSTCODE)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1, name=TokenRange(1, 2),
def test_postcode_with_designation_backwards():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.POSTCODE)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_POSTCODE)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(name=TokenRange(0, 1),
def test_near_item_at_beginning():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.NEAR_ITEM)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_NEAR_ITEM)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1, name=TokenRange(1, 2),
def test_near_item_at_end():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.NEAR_ITEM)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_NEAR_ITEM)]))
check_assignments(yield_token_assignments(q),
TokenAssignment(penalty=0.1, name=TokenRange(0, 1),
def test_near_item_in_middle():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.NEAR_ITEM)]),
- (BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_NEAR_ITEM)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q))
def test_qualifier_at_beginning():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.QUALIFIER)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_QUALIFIER)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
def test_qualifier_after_name():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(3, TokenType.QUALIFIER)]),
- (BreakType.WORD, PhraseType.NONE, [(4, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(5, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_QUALIFIER)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(4, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(5, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q),
def test_qualifier_before_housenumber():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.QUALIFIER)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.HOUSENUMBER)]),
- (BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_QUALIFIER)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_HOUSENUMBER)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q))
def test_qualifier_after_housenumber():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.HOUSENUMBER)]),
- (BreakType.WORD, PhraseType.NONE, [(2, TokenType.QUALIFIER)]),
- (BreakType.WORD, PhraseType.NONE, [(3, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_HOUSENUMBER)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(2, qmod.TOKEN_QUALIFIER)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q))
def test_qualifier_in_middle_of_phrase():
- q = make_query((BreakType.START, PhraseType.NONE, [(1, TokenType.PARTIAL)]),
- (BreakType.PHRASE, PhraseType.NONE, [(2, TokenType.PARTIAL)]),
- (BreakType.WORD, PhraseType.NONE, [(3, TokenType.QUALIFIER)]),
- (BreakType.WORD, PhraseType.NONE, [(4, TokenType.PARTIAL)]),
- (BreakType.PHRASE, PhraseType.NONE, [(5, TokenType.PARTIAL)]))
+ q = make_query((qmod.BREAK_START, qmod.PHRASE_ANY, [(1, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(2, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(3, qmod.TOKEN_QUALIFIER)]),
+ (qmod.BREAK_WORD, qmod.PHRASE_ANY, [(4, qmod.TOKEN_PARTIAL)]),
+ (qmod.BREAK_PHRASE, qmod.PHRASE_ANY, [(5, qmod.TOKEN_PARTIAL)]))
check_assignments(yield_token_assignments(q))
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2025 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for simplified trie structure.
+"""
+
+from nominatim_db.tokenizer.token_analysis.simple_trie import SimpleTrie
+
+def test_single_item_trie():
+ t = SimpleTrie([('foob', 42)])
+
+ assert t.longest_prefix('afoobar') == (None, 0)
+ assert t.longest_prefix('afoobar', start=1) == (42, 5)
+ assert t.longest_prefix('foob') == (42, 4)
+ assert t.longest_prefix('123foofoo', 3) == (None, 3)
+
+def test_complex_item_tree():
+ t = SimpleTrie([('a', 1),
+ ('b', 2),
+ ('auto', 3),
+ ('buto', 4),
+ ('automat', 5),
+ ('bu', 6),
+ ('bx', 7)])
+
+ assert t.longest_prefix('a') == (1, 1)
+ assert t.longest_prefix('au') == (1, 1)
+ assert t.longest_prefix('aut') == (1, 1)
+ assert t.longest_prefix('auto') == (3, 4)
+ assert t.longest_prefix('automat') == (5, 7)
+ assert t.longest_prefix('automatx') == (5, 7)
+ assert t.longest_prefix('butomat') == (4, 4)
+ assert t.longest_prefix('butomat', 1) == (None, 1)