]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_api/search/query.py
b2e18337c67e22e7198812689ee81a44af8ef499
[nominatim.git] / src / nominatim_api / search / query.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Datastructures for a tokenized query.
9 """
10 from typing import List, Tuple, Optional, Iterator
11 from abc import ABC, abstractmethod
12 import dataclasses
13 import enum
14
15
16 class BreakType(enum.Enum):
17     """ Type of break between tokens.
18     """
19     START = '<'
20     """ Begin of the query. """
21     END = '>'
22     """ End of the query. """
23     PHRASE = ','
24     """ Hard break between two phrases. Address parts cannot cross hard
25         phrase boundaries."""
26     SOFT_PHRASE = ':'
27     """ Likely break between two phrases. Address parts should not cross soft
28         phrase boundaries. Soft breaks can be inserted by a preprocessor
29         that is analysing the input string.
30     """
31     WORD = ' '
32     """ Break between words. """
33     PART = '-'
34     """ Break inside a word, for example a hyphen or apostrophe. """
35     TOKEN = '`'
36     """ Break created as a result of tokenization.
37         This may happen in languages without spaces between words.
38     """
39
40
41 class TokenType(enum.Enum):
42     """ Type of token.
43     """
44     WORD = enum.auto()
45     """ Full name of a place. """
46     PARTIAL = enum.auto()
47     """ Word term without breaks, does not necessarily represent a full name. """
48     HOUSENUMBER = enum.auto()
49     """ Housenumber term. """
50     POSTCODE = enum.auto()
51     """ Postal code term. """
52     COUNTRY = enum.auto()
53     """ Country name or reference. """
54     QUALIFIER = enum.auto()
55     """ Special term used together with name (e.g. _Hotel_ Bellevue). """
56     NEAR_ITEM = enum.auto()
57     """ Special term used as searchable object(e.g. supermarket in ...). """
58
59
60 class PhraseType(enum.Enum):
61     """ Designation of a phrase.
62     """
63     NONE = 0
64     """ No specific designation (i.e. source is free-form query). """
65     AMENITY = enum.auto()
66     """ Contains name or type of a POI. """
67     STREET = enum.auto()
68     """ Contains a street name optionally with a housenumber. """
69     CITY = enum.auto()
70     """ Contains the postal city. """
71     COUNTY = enum.auto()
72     """ Contains the equivalent of a county. """
73     STATE = enum.auto()
74     """ Contains a state or province. """
75     POSTCODE = enum.auto()
76     """ Contains a postal code. """
77     COUNTRY = enum.auto()
78     """ Contains the country name or code. """
79
80     def compatible_with(self, ttype: TokenType,
81                         is_full_phrase: bool) -> bool:
82         """ Check if the given token type can be used with the phrase type.
83         """
84         if self == PhraseType.NONE:
85             return not is_full_phrase or ttype != TokenType.QUALIFIER
86         if self == PhraseType.AMENITY:
87             return ttype in (TokenType.WORD, TokenType.PARTIAL)\
88                    or (is_full_phrase and ttype == TokenType.NEAR_ITEM)\
89                    or (not is_full_phrase and ttype == TokenType.QUALIFIER)
90         if self == PhraseType.STREET:
91             return ttype in (TokenType.WORD, TokenType.PARTIAL, TokenType.HOUSENUMBER)
92         if self == PhraseType.POSTCODE:
93             return ttype == TokenType.POSTCODE
94         if self == PhraseType.COUNTRY:
95             return ttype == TokenType.COUNTRY
96
97         return ttype in (TokenType.WORD, TokenType.PARTIAL)
98
99
100 @dataclasses.dataclass
101 class Token(ABC):
102     """ Base type for tokens.
103         Specific query analyzers must implement the concrete token class.
104     """
105
106     penalty: float
107     token: int
108     count: int
109     addr_count: int
110     lookup_word: str
111
112     @abstractmethod
113     def get_category(self) -> Tuple[str, str]:
114         """ Return the category restriction for qualifier terms and
115             category objects.
116         """
117
118
119 @dataclasses.dataclass
120 class TokenRange:
121     """ Indexes of query nodes over which a token spans.
122     """
123     start: int
124     end: int
125
126     def __lt__(self, other: 'TokenRange') -> bool:
127         return self.end <= other.start
128
129     def __le__(self, other: 'TokenRange') -> bool:
130         return NotImplemented
131
132     def __gt__(self, other: 'TokenRange') -> bool:
133         return self.start >= other.end
134
135     def __ge__(self, other: 'TokenRange') -> bool:
136         return NotImplemented
137
138     def replace_start(self, new_start: int) -> 'TokenRange':
139         """ Return a new token range with the new start.
140         """
141         return TokenRange(new_start, self.end)
142
143     def replace_end(self, new_end: int) -> 'TokenRange':
144         """ Return a new token range with the new end.
145         """
146         return TokenRange(self.start, new_end)
147
148     def split(self, index: int) -> Tuple['TokenRange', 'TokenRange']:
149         """ Split the span into two spans at the given index.
150             The index must be within the span.
151         """
152         return self.replace_end(index), self.replace_start(index)
153
154
155 @dataclasses.dataclass
156 class TokenList:
157     """ List of all tokens of a given type going from one breakpoint to another.
158     """
159     end: int
160     ttype: TokenType
161     tokens: List[Token]
162
163     def add_penalty(self, penalty: float) -> None:
164         """ Add the given penalty to all tokens in the list.
165         """
166         for token in self.tokens:
167             token.penalty += penalty
168
169
170 @dataclasses.dataclass
171 class QueryNode:
172     """ A node of the query representing a break between terms.
173     """
174     btype: BreakType
175     ptype: PhraseType
176     starting: List[TokenList] = dataclasses.field(default_factory=list)
177
178     def has_tokens(self, end: int, *ttypes: TokenType) -> bool:
179         """ Check if there are tokens of the given types ending at the
180             given node.
181         """
182         return any(tl.end == end and tl.ttype in ttypes for tl in self.starting)
183
184     def get_tokens(self, end: int, ttype: TokenType) -> Optional[List[Token]]:
185         """ Get the list of tokens of the given type starting at this node
186             and ending at the node 'end'. Returns 'None' if no such
187             tokens exist.
188         """
189         for tlist in self.starting:
190             if tlist.end == end and tlist.ttype == ttype:
191                 return tlist.tokens
192         return None
193
194
195 @dataclasses.dataclass
196 class Phrase:
197     """ A normalized query part. Phrases may be typed which means that
198         they then represent a specific part of the address.
199     """
200     ptype: PhraseType
201     text: str
202
203
204 class QueryStruct:
205     """ A tokenized search query together with the normalized source
206         from which the tokens have been parsed.
207
208         The query contains a list of nodes that represent the breaks
209         between words. Tokens span between nodes, which don't necessarily
210         need to be direct neighbours. Thus the query is represented as a
211         directed acyclic graph.
212
213         When created, a query contains a single node: the start of the
214         query. Further nodes can be added by appending to 'nodes'.
215     """
216
217     def __init__(self, source: List[Phrase]) -> None:
218         self.source = source
219         self.nodes: List[QueryNode] = \
220             [QueryNode(BreakType.START, source[0].ptype if source else PhraseType.NONE)]
221
222     def num_token_slots(self) -> int:
223         """ Return the length of the query in vertice steps.
224         """
225         return len(self.nodes) - 1
226
227     def add_node(self, btype: BreakType, ptype: PhraseType) -> None:
228         """ Append a new break node with the given break type.
229             The phrase type denotes the type for any tokens starting
230             at the node.
231         """
232         self.nodes.append(QueryNode(btype, ptype))
233
234     def add_token(self, trange: TokenRange, ttype: TokenType, token: Token) -> None:
235         """ Add a token to the query. 'start' and 'end' are the indexes of the
236             nodes from which to which the token spans. The indexes must exist
237             and are expected to be in the same phrase.
238             'ttype' denotes the type of the token and 'token' the token to
239             be inserted.
240
241             If the token type is not compatible with the phrase it should
242             be added to, then the token is silently dropped.
243         """
244         snode = self.nodes[trange.start]
245         full_phrase = snode.btype in (BreakType.START, BreakType.PHRASE)\
246             and self.nodes[trange.end].btype in (BreakType.PHRASE, BreakType.END)
247         if snode.ptype.compatible_with(ttype, full_phrase):
248             tlist = snode.get_tokens(trange.end, ttype)
249             if tlist is None:
250                 snode.starting.append(TokenList(trange.end, ttype, [token]))
251             else:
252                 tlist.append(token)
253
254     def get_tokens(self, trange: TokenRange, ttype: TokenType) -> List[Token]:
255         """ Get the list of tokens of a given type, spanning the given
256             nodes. The nodes must exist. If no tokens exist, an
257             empty list is returned.
258         """
259         return self.nodes[trange.start].get_tokens(trange.end, ttype) or []
260
261     def get_partials_list(self, trange: TokenRange) -> List[Token]:
262         """ Create a list of partial tokens between the given nodes.
263             The list is composed of the first token of type PARTIAL
264             going to the subsequent node. Such PARTIAL tokens are
265             assumed to exist.
266         """
267         return [next(iter(self.get_tokens(TokenRange(i, i+1), TokenType.PARTIAL)))
268                 for i in range(trange.start, trange.end)]
269
270     def iter_token_lists(self) -> Iterator[Tuple[int, QueryNode, TokenList]]:
271         """ Iterator over all token lists in the query.
272         """
273         for i, node in enumerate(self.nodes):
274             for tlist in node.starting:
275                 yield i, node, tlist
276
277     def find_lookup_word_by_id(self, token: int) -> str:
278         """ Find the first token with the given token ID and return
279             its lookup word. Returns 'None' if no such token exists.
280             The function is very slow and must only be used for
281             debugging.
282         """
283         for node in self.nodes:
284             for tlist in node.starting:
285                 for t in tlist.tokens:
286                     if t.token == token:
287                         return f"[{tlist.ttype.name[0]}]{t.lookup_word}"
288         return 'None'