]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_api/search/db_search_lookups.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / src / nominatim_api / search / db_search_lookups.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of lookup functions for the search_name table.
9 """
10 from typing import List, Any
11
12 import sqlalchemy as sa
13 from sqlalchemy.ext.compiler import compiles
14
15 from ..typing import SaFromClause
16 from ..sql.sqlalchemy_types import IntArray
17
18
19 LookupType = sa.sql.expression.FunctionElement[Any]
20
21
22 class LookupAll(LookupType):
23     """ Find all entries in search_name table that contain all of
24         a given list of tokens using an index for the search.
25     """
26     inherit_cache = True
27
28     def __init__(self, table: SaFromClause, column: str, tokens: List[int]) -> None:
29         super().__init__(table.c.place_id, getattr(table.c, column), column,
30                          sa.type_coerce(tokens, IntArray))
31
32
33 @compiles(LookupAll)
34 def _default_lookup_all(element: LookupAll,
35                         compiler: 'sa.Compiled', **kw: Any) -> str:
36     _, col, _, tokens = list(element.clauses)
37     return "(%s @> %s)" % (compiler.process(col, **kw),
38                            compiler.process(tokens, **kw))
39
40
41 @compiles(LookupAll, 'sqlite')
42 def _sqlite_lookup_all(element: LookupAll,
43                        compiler: 'sa.Compiled', **kw: Any) -> str:
44     place, col, colname, tokens = list(element.clauses)
45     return "(%s IN (SELECT CAST(value as bigint) FROM"\
46            " (SELECT array_intersect_fuzzy(places) as p FROM"\
47            "   (SELECT places FROM reverse_search_name"\
48            "   WHERE word IN (SELECT value FROM json_each('[' || %s || ']'))"\
49            "     AND column = %s"\
50            "   ORDER BY length(places)) as x) as u,"\
51            " json_each('[' || u.p || ']'))"\
52            " AND array_contains(%s, %s))"\
53         % (compiler.process(place, **kw),
54            compiler.process(tokens, **kw),
55            compiler.process(colname, **kw),
56            compiler.process(col, **kw),
57            compiler.process(tokens, **kw))
58
59
60 class LookupAny(LookupType):
61     """ Find all entries that contain at least one of the given tokens.
62         Use an index for the search.
63     """
64     inherit_cache = True
65
66     def __init__(self, table: SaFromClause, column: str, tokens: List[int]) -> None:
67         super().__init__(table.c.place_id, getattr(table.c, column), column,
68                          sa.type_coerce(tokens, IntArray))
69
70
71 @compiles(LookupAny)
72 def _default_lookup_any(element: LookupAny,
73                         compiler: 'sa.Compiled', **kw: Any) -> str:
74     _, col, _, tokens = list(element.clauses)
75     return "(%s && %s)" % (compiler.process(col, **kw),
76                            compiler.process(tokens, **kw))
77
78
79 @compiles(LookupAny, 'sqlite')
80 def _sqlite_lookup_any(element: LookupAny,
81                        compiler: 'sa.Compiled', **kw: Any) -> str:
82     place, _, colname, tokens = list(element.clauses)
83     return "%s IN (SELECT CAST(value as bigint) FROM"\
84            " (SELECT array_union(places) as p FROM reverse_search_name"\
85            "   WHERE word IN (SELECT value FROM json_each('[' || %s || ']'))"\
86            "     AND column = %s) as u,"\
87            " json_each('[' || u.p || ']'))" % (compiler.process(place, **kw),
88                                                compiler.process(tokens, **kw),
89                                                compiler.process(colname, **kw))
90
91
92 class Restrict(LookupType):
93     """ Find all entries that contain all of the given tokens.
94         Do not use an index for the search.
95     """
96     inherit_cache = True
97
98     def __init__(self, table: SaFromClause, column: str, tokens: List[int]) -> None:
99         super().__init__(getattr(table.c, column),
100                          sa.type_coerce(tokens, IntArray))
101
102
103 @compiles(Restrict)
104 def _default_restrict(element: Restrict,
105                       compiler: 'sa.Compiled', **kw: Any) -> str:
106     arg1, arg2 = list(element.clauses)
107     return "(coalesce(null, %s) @> %s)" % (compiler.process(arg1, **kw),
108                                            compiler.process(arg2, **kw))
109
110
111 @compiles(Restrict, 'sqlite')
112 def _sqlite_restrict(element: Restrict,
113                      compiler: 'sa.Compiled', **kw: Any) -> str:
114     return "array_contains(%s)" % compiler.process(element.clauses, **kw)