]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/indexer/runners.py
make DB helper functions free functions
[nominatim.git] / src / nominatim_db / indexer / runners.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Mix-ins that provide the actual commands for the indexer for various indexing
9 tasks.
10 """
11 from typing import Any, List
12 import functools
13
14 from psycopg2 import sql as pysql
15 import psycopg2.extras
16
17 from ..typing import Query, DictCursorResult, DictCursorResults, Protocol
18 from ..db.async_connection import DBConnection
19 from ..data.place_info import PlaceInfo
20 from ..tokenizer.base import AbstractAnalyzer
21
22 # pylint: disable=C0111
23
24 def _mk_valuelist(template: str, num: int) -> pysql.Composed:
25     return pysql.SQL(',').join([pysql.SQL(template)] * num)
26
27 def _analyze_place(place: DictCursorResult, analyzer: AbstractAnalyzer) -> psycopg2.extras.Json:
28     return psycopg2.extras.Json(analyzer.process_place(PlaceInfo(place)))
29
30
31 class Runner(Protocol):
32     def name(self) -> str: ...
33     def sql_count_objects(self) -> Query: ...
34     def sql_get_objects(self) -> Query: ...
35     def get_place_details(self, worker: DBConnection,
36                           ids: DictCursorResults) -> DictCursorResults: ...
37     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: ...
38
39
40 class AbstractPlacexRunner:
41     """ Returns SQL commands for indexing of the placex table.
42     """
43     SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ')
44     UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
45
46     def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None:
47         self.rank = rank
48         self.analyzer = analyzer
49
50
51     @functools.lru_cache(maxsize=1)
52     def _index_sql(self, num_places: int) -> pysql.Composed:
53         return pysql.SQL(
54             """ UPDATE placex
55                 SET indexed_status = 0, address = v.addr, token_info = v.ti,
56                     name = v.name, linked_place_id = v.linked_place_id
57                 FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti)
58                 WHERE place_id = v.id
59             """).format(_mk_valuelist(AbstractPlacexRunner.UPDATE_LINE, num_places))
60
61
62     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
63         worker.perform("""SELECT place_id, extra.*
64                           FROM placex, LATERAL placex_indexing_prepare(placex) as extra
65                           WHERE place_id IN %s""",
66                        (tuple((p[0] for p in ids)), ))
67
68         return []
69
70
71     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
72         values: List[Any] = []
73         for place in places:
74             for field in ('place_id', 'name', 'address', 'linked_place_id'):
75                 values.append(place[field])
76             values.append(_analyze_place(place, self.analyzer))
77
78         worker.perform(self._index_sql(len(places)), values)
79
80
81 class RankRunner(AbstractPlacexRunner):
82     """ Returns SQL commands for indexing one rank within the placex table.
83     """
84
85     def name(self) -> str:
86         return f"rank {self.rank}"
87
88     def sql_count_objects(self) -> pysql.Composed:
89         return pysql.SQL("""SELECT count(*) FROM placex
90                             WHERE rank_address = {} and indexed_status > 0
91                          """).format(pysql.Literal(self.rank))
92
93     def sql_get_objects(self) -> pysql.Composed:
94         return self.SELECT_SQL + pysql.SQL(
95             """WHERE indexed_status > 0 and rank_address = {}
96                ORDER BY geometry_sector
97             """).format(pysql.Literal(self.rank))
98
99
100 class BoundaryRunner(AbstractPlacexRunner):
101     """ Returns SQL commands for indexing the administrative boundaries
102         of a certain rank.
103     """
104
105     def name(self) -> str:
106         return f"boundaries rank {self.rank}"
107
108     def sql_count_objects(self) -> pysql.Composed:
109         return pysql.SQL("""SELECT count(*) FROM placex
110                             WHERE indexed_status > 0
111                               AND rank_search = {}
112                               AND class = 'boundary' and type = 'administrative'
113                          """).format(pysql.Literal(self.rank))
114
115     def sql_get_objects(self) -> pysql.Composed:
116         return self.SELECT_SQL + pysql.SQL(
117             """WHERE indexed_status > 0 and rank_search = {}
118                      and class = 'boundary' and type = 'administrative'
119                ORDER BY partition, admin_level
120             """).format(pysql.Literal(self.rank))
121
122
123 class InterpolationRunner:
124     """ Returns SQL commands for indexing the address interpolation table
125         location_property_osmline.
126     """
127
128     def __init__(self, analyzer: AbstractAnalyzer) -> None:
129         self.analyzer = analyzer
130
131
132     def name(self) -> str:
133         return "interpolation lines (location_property_osmline)"
134
135     def sql_count_objects(self) -> str:
136         return """SELECT count(*) FROM location_property_osmline
137                   WHERE indexed_status > 0"""
138
139     def sql_get_objects(self) -> str:
140         return """SELECT place_id
141                   FROM location_property_osmline
142                   WHERE indexed_status > 0
143                   ORDER BY geometry_sector"""
144
145
146     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
147         worker.perform("""SELECT place_id, get_interpolation_address(address, osm_id) as address
148                           FROM location_property_osmline WHERE place_id IN %s""",
149                        (tuple((p[0] for p in ids)), ))
150         return []
151
152
153     @functools.lru_cache(maxsize=1)
154     def _index_sql(self, num_places: int) -> pysql.Composed:
155         return pysql.SQL("""UPDATE location_property_osmline
156                             SET indexed_status = 0, address = v.addr, token_info = v.ti
157                             FROM (VALUES {}) as v(id, addr, ti)
158                             WHERE place_id = v.id
159                          """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
160
161
162     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
163         values: List[Any] = []
164         for place in places:
165             values.extend((place[x] for x in ('place_id', 'address')))
166             values.append(_analyze_place(place, self.analyzer))
167
168         worker.perform(self._index_sql(len(places)), values)
169
170
171
172 class PostcodeRunner(Runner):
173     """ Provides the SQL commands for indexing the location_postcode table.
174     """
175
176     def name(self) -> str:
177         return "postcodes (location_postcode)"
178
179
180     def sql_count_objects(self) -> str:
181         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
182
183
184     def sql_get_objects(self) -> str:
185         return """SELECT place_id FROM location_postcode
186                   WHERE indexed_status > 0
187                   ORDER BY country_code, postcode"""
188
189
190     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
191         return ids
192
193     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
194         worker.perform(pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
195                                     WHERE place_id IN ({})""")
196                        .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in places))))