]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_api/search/query.py
Merge pull request #3610 from lonvia/search-preprocessing
[nominatim.git] / src / nominatim_api / search / query.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Datastructures for a tokenized query.
9 """
10 from typing import List, Tuple, Optional, Iterator
11 from abc import ABC, abstractmethod
12 import dataclasses
13 import enum
14
15
16 class BreakType(enum.Enum):
17     """ Type of break between tokens.
18     """
19     START = '<'
20     """ Begin of the query. """
21     END = '>'
22     """ End of the query. """
23     PHRASE = ','
24     """ Break between two phrases. """
25     WORD = ' '
26     """ Break between words. """
27     PART = '-'
28     """ Break inside a word, for example a hyphen or apostrophe. """
29     TOKEN = '`'
30     """ Break created as a result of tokenization.
31         This may happen in languages without spaces between words.
32     """
33
34
35 class TokenType(enum.Enum):
36     """ Type of token.
37     """
38     WORD = enum.auto()
39     """ Full name of a place. """
40     PARTIAL = enum.auto()
41     """ Word term without breaks, does not necessarily represent a full name. """
42     HOUSENUMBER = enum.auto()
43     """ Housenumber term. """
44     POSTCODE = enum.auto()
45     """ Postal code term. """
46     COUNTRY = enum.auto()
47     """ Country name or reference. """
48     QUALIFIER = enum.auto()
49     """ Special term used together with name (e.g. _Hotel_ Bellevue). """
50     NEAR_ITEM = enum.auto()
51     """ Special term used as searchable object(e.g. supermarket in ...). """
52
53
54 class PhraseType(enum.Enum):
55     """ Designation of a phrase.
56     """
57     NONE = 0
58     """ No specific designation (i.e. source is free-form query). """
59     AMENITY = enum.auto()
60     """ Contains name or type of a POI. """
61     STREET = enum.auto()
62     """ Contains a street name optionally with a housenumber. """
63     CITY = enum.auto()
64     """ Contains the postal city. """
65     COUNTY = enum.auto()
66     """ Contains the equivalent of a county. """
67     STATE = enum.auto()
68     """ Contains a state or province. """
69     POSTCODE = enum.auto()
70     """ Contains a postal code. """
71     COUNTRY = enum.auto()
72     """ Contains the country name or code. """
73
74     def compatible_with(self, ttype: TokenType,
75                         is_full_phrase: bool) -> bool:
76         """ Check if the given token type can be used with the phrase type.
77         """
78         if self == PhraseType.NONE:
79             return not is_full_phrase or ttype != TokenType.QUALIFIER
80         if self == PhraseType.AMENITY:
81             return ttype in (TokenType.WORD, TokenType.PARTIAL)\
82                    or (is_full_phrase and ttype == TokenType.NEAR_ITEM)\
83                    or (not is_full_phrase and ttype == TokenType.QUALIFIER)
84         if self == PhraseType.STREET:
85             return ttype in (TokenType.WORD, TokenType.PARTIAL, TokenType.HOUSENUMBER)
86         if self == PhraseType.POSTCODE:
87             return ttype == TokenType.POSTCODE
88         if self == PhraseType.COUNTRY:
89             return ttype == TokenType.COUNTRY
90
91         return ttype in (TokenType.WORD, TokenType.PARTIAL)
92
93
94 @dataclasses.dataclass
95 class Token(ABC):
96     """ Base type for tokens.
97         Specific query analyzers must implement the concrete token class.
98     """
99
100     penalty: float
101     token: int
102     count: int
103     addr_count: int
104     lookup_word: str
105
106     @abstractmethod
107     def get_category(self) -> Tuple[str, str]:
108         """ Return the category restriction for qualifier terms and
109             category objects.
110         """
111
112
113 @dataclasses.dataclass
114 class TokenRange:
115     """ Indexes of query nodes over which a token spans.
116     """
117     start: int
118     end: int
119
120     def __lt__(self, other: 'TokenRange') -> bool:
121         return self.end <= other.start
122
123     def __le__(self, other: 'TokenRange') -> bool:
124         return NotImplemented
125
126     def __gt__(self, other: 'TokenRange') -> bool:
127         return self.start >= other.end
128
129     def __ge__(self, other: 'TokenRange') -> bool:
130         return NotImplemented
131
132     def replace_start(self, new_start: int) -> 'TokenRange':
133         """ Return a new token range with the new start.
134         """
135         return TokenRange(new_start, self.end)
136
137     def replace_end(self, new_end: int) -> 'TokenRange':
138         """ Return a new token range with the new end.
139         """
140         return TokenRange(self.start, new_end)
141
142     def split(self, index: int) -> Tuple['TokenRange', 'TokenRange']:
143         """ Split the span into two spans at the given index.
144             The index must be within the span.
145         """
146         return self.replace_end(index), self.replace_start(index)
147
148
149 @dataclasses.dataclass
150 class TokenList:
151     """ List of all tokens of a given type going from one breakpoint to another.
152     """
153     end: int
154     ttype: TokenType
155     tokens: List[Token]
156
157     def add_penalty(self, penalty: float) -> None:
158         """ Add the given penalty to all tokens in the list.
159         """
160         for token in self.tokens:
161             token.penalty += penalty
162
163
164 @dataclasses.dataclass
165 class QueryNode:
166     """ A node of the query representing a break between terms.
167     """
168     btype: BreakType
169     ptype: PhraseType
170     starting: List[TokenList] = dataclasses.field(default_factory=list)
171
172     def has_tokens(self, end: int, *ttypes: TokenType) -> bool:
173         """ Check if there are tokens of the given types ending at the
174             given node.
175         """
176         return any(tl.end == end and tl.ttype in ttypes for tl in self.starting)
177
178     def get_tokens(self, end: int, ttype: TokenType) -> Optional[List[Token]]:
179         """ Get the list of tokens of the given type starting at this node
180             and ending at the node 'end'. Returns 'None' if no such
181             tokens exist.
182         """
183         for tlist in self.starting:
184             if tlist.end == end and tlist.ttype == ttype:
185                 return tlist.tokens
186         return None
187
188
189 @dataclasses.dataclass
190 class Phrase:
191     """ A normalized query part. Phrases may be typed which means that
192         they then represent a specific part of the address.
193     """
194     ptype: PhraseType
195     text: str
196
197
198 class QueryStruct:
199     """ A tokenized search query together with the normalized source
200         from which the tokens have been parsed.
201
202         The query contains a list of nodes that represent the breaks
203         between words. Tokens span between nodes, which don't necessarily
204         need to be direct neighbours. Thus the query is represented as a
205         directed acyclic graph.
206
207         When created, a query contains a single node: the start of the
208         query. Further nodes can be added by appending to 'nodes'.
209     """
210
211     def __init__(self, source: List[Phrase]) -> None:
212         self.source = source
213         self.nodes: List[QueryNode] = \
214             [QueryNode(BreakType.START, source[0].ptype if source else PhraseType.NONE)]
215
216     def num_token_slots(self) -> int:
217         """ Return the length of the query in vertice steps.
218         """
219         return len(self.nodes) - 1
220
221     def add_node(self, btype: BreakType, ptype: PhraseType) -> None:
222         """ Append a new break node with the given break type.
223             The phrase type denotes the type for any tokens starting
224             at the node.
225         """
226         self.nodes.append(QueryNode(btype, ptype))
227
228     def add_token(self, trange: TokenRange, ttype: TokenType, token: Token) -> None:
229         """ Add a token to the query. 'start' and 'end' are the indexes of the
230             nodes from which to which the token spans. The indexes must exist
231             and are expected to be in the same phrase.
232             'ttype' denotes the type of the token and 'token' the token to
233             be inserted.
234
235             If the token type is not compatible with the phrase it should
236             be added to, then the token is silently dropped.
237         """
238         snode = self.nodes[trange.start]
239         full_phrase = snode.btype in (BreakType.START, BreakType.PHRASE)\
240             and self.nodes[trange.end].btype in (BreakType.PHRASE, BreakType.END)
241         if snode.ptype.compatible_with(ttype, full_phrase):
242             tlist = snode.get_tokens(trange.end, ttype)
243             if tlist is None:
244                 snode.starting.append(TokenList(trange.end, ttype, [token]))
245             else:
246                 tlist.append(token)
247
248     def get_tokens(self, trange: TokenRange, ttype: TokenType) -> List[Token]:
249         """ Get the list of tokens of a given type, spanning the given
250             nodes. The nodes must exist. If no tokens exist, an
251             empty list is returned.
252         """
253         return self.nodes[trange.start].get_tokens(trange.end, ttype) or []
254
255     def get_partials_list(self, trange: TokenRange) -> List[Token]:
256         """ Create a list of partial tokens between the given nodes.
257             The list is composed of the first token of type PARTIAL
258             going to the subsequent node. Such PARTIAL tokens are
259             assumed to exist.
260         """
261         return [next(iter(self.get_tokens(TokenRange(i, i+1), TokenType.PARTIAL)))
262                 for i in range(trange.start, trange.end)]
263
264     def iter_token_lists(self) -> Iterator[Tuple[int, QueryNode, TokenList]]:
265         """ Iterator over all token lists in the query.
266         """
267         for i, node in enumerate(self.nodes):
268             for tlist in node.starting:
269                 yield i, node, tlist
270
271     def find_lookup_word_by_id(self, token: int) -> str:
272         """ Find the first token with the given token ID and return
273             its lookup word. Returns 'None' if no such token exists.
274             The function is very slow and must only be used for
275             debugging.
276         """
277         for node in self.nodes:
278             for tlist in node.starting:
279                 for t in tlist.tokens:
280                     if t.token == token:
281                         return f"[{tlist.ttype.name[0]}]{t.lookup_word}"
282         return 'None'