]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/search/query.py
Merge pull request #3321 from lonvia/remove-duplicate-partials
[nominatim.git] / nominatim / api / search / query.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Datastructures for a tokenized query.
9 """
10 from typing import List, Tuple, Optional, Iterator
11 from abc import ABC, abstractmethod
12 import dataclasses
13 import enum
14
15 class BreakType(enum.Enum):
16     """ Type of break between tokens.
17     """
18     START = '<'
19     """ Begin of the query. """
20     END = '>'
21     """ End of the query. """
22     PHRASE = ','
23     """ Break between two phrases. """
24     WORD = ' '
25     """ Break between words. """
26     PART = '-'
27     """ Break inside a word, for example a hyphen or apostrophe. """
28     TOKEN = '`'
29     """ Break created as a result of tokenization.
30         This may happen in languages without spaces between words.
31     """
32
33
34 class TokenType(enum.Enum):
35     """ Type of token.
36     """
37     WORD = enum.auto()
38     """ Full name of a place. """
39     PARTIAL = enum.auto()
40     """ Word term without breaks, does not necessarily represent a full name. """
41     HOUSENUMBER = enum.auto()
42     """ Housenumber term. """
43     POSTCODE = enum.auto()
44     """ Postal code term. """
45     COUNTRY = enum.auto()
46     """ Country name or reference. """
47     QUALIFIER = enum.auto()
48     """ Special term used together with name (e.g. _Hotel_ Bellevue). """
49     NEAR_ITEM = enum.auto()
50     """ Special term used as searchable object(e.g. supermarket in ...). """
51
52
53 class PhraseType(enum.Enum):
54     """ Designation of a phrase.
55     """
56     NONE = 0
57     """ No specific designation (i.e. source is free-form query). """
58     AMENITY = enum.auto()
59     """ Contains name or type of a POI. """
60     STREET = enum.auto()
61     """ Contains a street name optionally with a housenumber. """
62     CITY = enum.auto()
63     """ Contains the postal city. """
64     COUNTY = enum.auto()
65     """ Contains the equivalent of a county. """
66     STATE = enum.auto()
67     """ Contains a state or province. """
68     POSTCODE = enum.auto()
69     """ Contains a postal code. """
70     COUNTRY = enum.auto()
71     """ Contains the country name or code. """
72
73     def compatible_with(self, ttype: TokenType,
74                         is_full_phrase: bool) -> bool:
75         """ Check if the given token type can be used with the phrase type.
76         """
77         if self == PhraseType.NONE:
78             return not is_full_phrase or ttype != TokenType.QUALIFIER
79         if self == PhraseType.AMENITY:
80             return ttype in (TokenType.WORD, TokenType.PARTIAL)\
81                    or (is_full_phrase and ttype == TokenType.NEAR_ITEM)\
82                    or (not is_full_phrase and ttype == TokenType.QUALIFIER)
83         if self == PhraseType.STREET:
84             return ttype in (TokenType.WORD, TokenType.PARTIAL, TokenType.HOUSENUMBER)
85         if self == PhraseType.POSTCODE:
86             return ttype == TokenType.POSTCODE
87         if self == PhraseType.COUNTRY:
88             return ttype == TokenType.COUNTRY
89
90         return ttype in (TokenType.WORD, TokenType.PARTIAL)
91
92
93 @dataclasses.dataclass
94 class Token(ABC):
95     """ Base type for tokens.
96         Specific query analyzers must implement the concrete token class.
97     """
98
99     penalty: float
100     token: int
101     count: int
102     lookup_word: str
103     is_indexed: bool
104
105
106     @abstractmethod
107     def get_category(self) -> Tuple[str, str]:
108         """ Return the category restriction for qualifier terms and
109             category objects.
110         """
111
112 @dataclasses.dataclass
113 class TokenRange:
114     """ Indexes of query nodes over which a token spans.
115     """
116     start: int
117     end: int
118
119     def __lt__(self, other: 'TokenRange') -> bool:
120         return self.end <= other.start
121
122
123     def __le__(self, other: 'TokenRange') -> bool:
124         return NotImplemented
125
126
127     def __gt__(self, other: 'TokenRange') -> bool:
128         return self.start >= other.end
129
130
131     def __ge__(self, other: 'TokenRange') -> bool:
132         return NotImplemented
133
134
135     def replace_start(self, new_start: int) -> 'TokenRange':
136         """ Return a new token range with the new start.
137         """
138         return TokenRange(new_start, self.end)
139
140
141     def replace_end(self, new_end: int) -> 'TokenRange':
142         """ Return a new token range with the new end.
143         """
144         return TokenRange(self.start, new_end)
145
146
147     def split(self, index: int) -> Tuple['TokenRange', 'TokenRange']:
148         """ Split the span into two spans at the given index.
149             The index must be within the span.
150         """
151         return self.replace_end(index), self.replace_start(index)
152
153
154 @dataclasses.dataclass
155 class TokenList:
156     """ List of all tokens of a given type going from one breakpoint to another.
157     """
158     end: int
159     ttype: TokenType
160     tokens: List[Token]
161
162
163     def add_penalty(self, penalty: float) -> None:
164         """ Add the given penalty to all tokens in the list.
165         """
166         for token in self.tokens:
167             token.penalty += penalty
168
169
170 @dataclasses.dataclass
171 class QueryNode:
172     """ A node of the querry representing a break between terms.
173     """
174     btype: BreakType
175     ptype: PhraseType
176     starting: List[TokenList] = dataclasses.field(default_factory=list)
177
178     def has_tokens(self, end: int, *ttypes: TokenType) -> bool:
179         """ Check if there are tokens of the given types ending at the
180             given node.
181         """
182         return any(tl.end == end and tl.ttype in ttypes for tl in self.starting)
183
184
185     def get_tokens(self, end: int, ttype: TokenType) -> Optional[List[Token]]:
186         """ Get the list of tokens of the given type starting at this node
187             and ending at the node 'end'. Returns 'None' if no such
188             tokens exist.
189         """
190         for tlist in self.starting:
191             if tlist.end == end and tlist.ttype == ttype:
192                 return tlist.tokens
193         return None
194
195
196 @dataclasses.dataclass
197 class Phrase:
198     """ A normalized query part. Phrases may be typed which means that
199         they then represent a specific part of the address.
200     """
201     ptype: PhraseType
202     text: str
203
204
205 class QueryStruct:
206     """ A tokenized search query together with the normalized source
207         from which the tokens have been parsed.
208
209         The query contains a list of nodes that represent the breaks
210         between words. Tokens span between nodes, which don't necessarily
211         need to be direct neighbours. Thus the query is represented as a
212         directed acyclic graph.
213
214         When created, a query contains a single node: the start of the
215         query. Further nodes can be added by appending to 'nodes'.
216     """
217
218     def __init__(self, source: List[Phrase]) -> None:
219         self.source = source
220         self.nodes: List[QueryNode] = \
221             [QueryNode(BreakType.START, source[0].ptype if source else PhraseType.NONE)]
222
223
224     def num_token_slots(self) -> int:
225         """ Return the length of the query in vertice steps.
226         """
227         return len(self.nodes) - 1
228
229
230     def add_node(self, btype: BreakType, ptype: PhraseType) -> None:
231         """ Append a new break node with the given break type.
232             The phrase type denotes the type for any tokens starting
233             at the node.
234         """
235         self.nodes.append(QueryNode(btype, ptype))
236
237
238     def add_token(self, trange: TokenRange, ttype: TokenType, token: Token) -> None:
239         """ Add a token to the query. 'start' and 'end' are the indexes of the
240             nodes from which to which the token spans. The indexes must exist
241             and are expected to be in the same phrase.
242             'ttype' denotes the type of the token and 'token' the token to
243             be inserted.
244
245             If the token type is not compatible with the phrase it should
246             be added to, then the token is silently dropped.
247         """
248         snode = self.nodes[trange.start]
249         full_phrase = snode.btype in (BreakType.START, BreakType.PHRASE)\
250                       and self.nodes[trange.end].btype in (BreakType.PHRASE, BreakType.END)
251         if snode.ptype.compatible_with(ttype, full_phrase):
252             tlist = snode.get_tokens(trange.end, ttype)
253             if tlist is None:
254                 snode.starting.append(TokenList(trange.end, ttype, [token]))
255             else:
256                 tlist.append(token)
257
258
259     def get_tokens(self, trange: TokenRange, ttype: TokenType) -> List[Token]:
260         """ Get the list of tokens of a given type, spanning the given
261             nodes. The nodes must exist. If no tokens exist, an
262             empty list is returned.
263         """
264         return self.nodes[trange.start].get_tokens(trange.end, ttype) or []
265
266
267     def get_partials_list(self, trange: TokenRange) -> List[Token]:
268         """ Create a list of partial tokens between the given nodes.
269             The list is composed of the first token of type PARTIAL
270             going to the subsequent node. Such PARTIAL tokens are
271             assumed to exist.
272         """
273         return [next(iter(self.get_tokens(TokenRange(i, i+1), TokenType.PARTIAL)))
274                           for i in range(trange.start, trange.end)]
275
276
277     def iter_token_lists(self) -> Iterator[Tuple[int, QueryNode, TokenList]]:
278         """ Iterator over all token lists in the query.
279         """
280         for i, node in enumerate(self.nodes):
281             for tlist in node.starting:
282                 yield i, node, tlist
283
284
285     def find_lookup_word_by_id(self, token: int) -> str:
286         """ Find the first token with the given token ID and return
287             its lookup word. Returns 'None' if no such token exists.
288             The function is very slow and must only be used for
289             debugging.
290         """
291         for node in self.nodes:
292             for tlist in node.starting:
293                 for t in tlist.tokens:
294                     if t.token == token:
295                         return f"[{tlist.ttype.name[0]}]{t.lookup_word}"
296         return 'None'