]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/search/query.py
replace CASE construct with plpgsql function
[nominatim.git] / nominatim / api / search / query.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Datastructures for a tokenized query.
9 """
10 from typing import List, Tuple, Optional, NamedTuple, Iterator
11 from abc import ABC, abstractmethod
12 import dataclasses
13 import enum
14
15 class BreakType(enum.Enum):
16     """ Type of break between tokens.
17     """
18     START = '<'
19     """ Begin of the query. """
20     END = '>'
21     """ End of the query. """
22     PHRASE = ','
23     """ Break between two phrases. """
24     WORD = ' '
25     """ Break between words. """
26     PART = '-'
27     """ Break inside a word, for example a hyphen or apostrophe. """
28     TOKEN = '`'
29     """ Break created as a result of tokenization.
30         This may happen in languages without spaces between words.
31     """
32
33
34 class TokenType(enum.Enum):
35     """ Type of token.
36     """
37     WORD = enum.auto()
38     """ Full name of a place. """
39     PARTIAL = enum.auto()
40     """ Word term without breaks, does not necessarily represent a full name. """
41     HOUSENUMBER = enum.auto()
42     """ Housenumber term. """
43     POSTCODE = enum.auto()
44     """ Postal code term. """
45     COUNTRY = enum.auto()
46     """ Country name or reference. """
47     QUALIFIER = enum.auto()
48     """ Special term used together with name (e.g. _Hotel_ Bellevue). """
49     CATEGORY = enum.auto()
50     """ Special term used as searchable object(e.g. supermarket in ...). """
51
52
53 class PhraseType(enum.Enum):
54     """ Designation of a phrase.
55     """
56     NONE = 0
57     """ No specific designation (i.e. source is free-form query). """
58     AMENITY = enum.auto()
59     """ Contains name or type of a POI. """
60     STREET = enum.auto()
61     """ Contains a street name optionally with a housenumber. """
62     CITY = enum.auto()
63     """ Contains the postal city. """
64     COUNTY = enum.auto()
65     """ Contains the equivalent of a county. """
66     STATE = enum.auto()
67     """ Contains a state or province. """
68     POSTCODE = enum.auto()
69     """ Contains a postal code. """
70     COUNTRY = enum.auto()
71     """ Contains the country name or code. """
72
73     def compatible_with(self, ttype: TokenType) -> bool:
74         """ Check if the given token type can be used with the phrase type.
75         """
76         if self == PhraseType.NONE:
77             return True
78         if self == PhraseType.AMENITY:
79             return ttype in (TokenType.WORD, TokenType.PARTIAL,
80                              TokenType.QUALIFIER, TokenType.CATEGORY)
81         if self == PhraseType.STREET:
82             return ttype in (TokenType.WORD, TokenType.PARTIAL, TokenType.HOUSENUMBER)
83         if self == PhraseType.POSTCODE:
84             return ttype == TokenType.POSTCODE
85         if self == PhraseType.COUNTRY:
86             return ttype == TokenType.COUNTRY
87
88         return ttype in (TokenType.WORD, TokenType.PARTIAL)
89
90
91 @dataclasses.dataclass
92 class Token(ABC):
93     """ Base type for tokens.
94         Specific query analyzers must implement the concrete token class.
95     """
96
97     penalty: float
98     token: int
99     count: int
100     lookup_word: str
101     is_indexed: bool
102
103
104     @abstractmethod
105     def get_category(self) -> Tuple[str, str]:
106         """ Return the category restriction for qualifier terms and
107             category objects.
108         """
109
110
111 class TokenRange(NamedTuple):
112     """ Indexes of query nodes over which a token spans.
113     """
114     start: int
115     end: int
116
117     def replace_start(self, new_start: int) -> 'TokenRange':
118         """ Return a new token range with the new start.
119         """
120         return TokenRange(new_start, self.end)
121
122
123     def replace_end(self, new_end: int) -> 'TokenRange':
124         """ Return a new token range with the new end.
125         """
126         return TokenRange(self.start, new_end)
127
128
129     def split(self, index: int) -> Tuple['TokenRange', 'TokenRange']:
130         """ Split the span into two spans at the given index.
131             The index must be within the span.
132         """
133         return self.replace_end(index), self.replace_start(index)
134
135
136 @dataclasses.dataclass
137 class TokenList:
138     """ List of all tokens of a given type going from one breakpoint to another.
139     """
140     end: int
141     ttype: TokenType
142     tokens: List[Token]
143
144
145     def add_penalty(self, penalty: float) -> None:
146         """ Add the given penalty to all tokens in the list.
147         """
148         for token in self.tokens:
149             token.penalty += penalty
150
151
152 @dataclasses.dataclass
153 class QueryNode:
154     """ A node of the querry representing a break between terms.
155     """
156     btype: BreakType
157     ptype: PhraseType
158     starting: List[TokenList] = dataclasses.field(default_factory=list)
159
160     def has_tokens(self, end: int, *ttypes: TokenType) -> bool:
161         """ Check if there are tokens of the given types ending at the
162             given node.
163         """
164         return any(tl.end == end and tl.ttype in ttypes for tl in self.starting)
165
166
167     def get_tokens(self, end: int, ttype: TokenType) -> Optional[List[Token]]:
168         """ Get the list of tokens of the given type starting at this node
169             and ending at the node 'end'. Returns 'None' if no such
170             tokens exist.
171         """
172         for tlist in self.starting:
173             if tlist.end == end and tlist.ttype == ttype:
174                 return tlist.tokens
175         return None
176
177
178 @dataclasses.dataclass
179 class Phrase:
180     """ A normalized query part. Phrases may be typed which means that
181         they then represent a specific part of the address.
182     """
183     ptype: PhraseType
184     text: str
185
186
187 class QueryStruct:
188     """ A tokenized search query together with the normalized source
189         from which the tokens have been parsed.
190
191         The query contains a list of nodes that represent the breaks
192         between words. Tokens span between nodes, which don't necessarily
193         need to be direct neighbours. Thus the query is represented as a
194         directed acyclic graph.
195
196         When created, a query contains a single node: the start of the
197         query. Further nodes can be added by appending to 'nodes'.
198     """
199
200     def __init__(self, source: List[Phrase]) -> None:
201         self.source = source
202         self.nodes: List[QueryNode] = \
203             [QueryNode(BreakType.START, source[0].ptype if source else PhraseType.NONE)]
204
205
206     def num_token_slots(self) -> int:
207         """ Return the length of the query in vertice steps.
208         """
209         return len(self.nodes) - 1
210
211
212     def add_node(self, btype: BreakType, ptype: PhraseType) -> None:
213         """ Append a new break node with the given break type.
214             The phrase type denotes the type for any tokens starting
215             at the node.
216         """
217         self.nodes.append(QueryNode(btype, ptype))
218
219
220     def add_token(self, trange: TokenRange, ttype: TokenType, token: Token) -> None:
221         """ Add a token to the query. 'start' and 'end' are the indexes of the
222             nodes from which to which the token spans. The indexes must exist
223             and are expected to be in the same phrase.
224             'ttype' denotes the type of the token and 'token' the token to
225             be inserted.
226
227             If the token type is not compatible with the phrase it should
228             be added to, then the token is silently dropped.
229         """
230         snode = self.nodes[trange.start]
231         if snode.ptype.compatible_with(ttype):
232             tlist = snode.get_tokens(trange.end, ttype)
233             if tlist is None:
234                 snode.starting.append(TokenList(trange.end, ttype, [token]))
235             else:
236                 tlist.append(token)
237
238
239     def get_tokens(self, trange: TokenRange, ttype: TokenType) -> List[Token]:
240         """ Get the list of tokens of a given type, spanning the given
241             nodes. The nodes must exist. If no tokens exist, an
242             empty list is returned.
243         """
244         return self.nodes[trange.start].get_tokens(trange.end, ttype) or []
245
246
247     def get_partials_list(self, trange: TokenRange) -> List[Token]:
248         """ Create a list of partial tokens between the given nodes.
249             The list is composed of the first token of type PARTIAL
250             going to the subsequent node. Such PARTIAL tokens are
251             assumed to exist.
252         """
253         return [next(iter(self.get_tokens(TokenRange(i, i+1), TokenType.PARTIAL)))
254                           for i in range(trange.start, trange.end)]
255
256
257     def iter_token_lists(self) -> Iterator[Tuple[int, QueryNode, TokenList]]:
258         """ Iterator over all token lists in the query.
259         """
260         for i, node in enumerate(self.nodes):
261             for tlist in node.starting:
262                 yield i, node, tlist
263
264
265     def find_lookup_word_by_id(self, token: int) -> str:
266         """ Find the first token with the given token ID and return
267             its lookup word. Returns 'None' if no such token exists.
268             The function is very slow and must only be used for
269             debugging.
270         """
271         for node in self.nodes:
272             for tlist in node.starting:
273                 for t in tlist.tokens:
274                     if t.token == token:
275                         return f"[{tlist.ttype.name[0]}]{t.lookup_word}"
276         return 'None'