]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/runners.py
Merge pull request #3062 from lonvia/enable-psycopg
[nominatim.git] / nominatim / indexer / runners.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Mix-ins that provide the actual commands for the indexer for various indexing
9 tasks.
10 """
11 from typing import Any, List
12 import functools
13
14 from psycopg2 import sql as pysql
15 import psycopg2.extras
16
17 from nominatim.data.place_info import PlaceInfo
18 from nominatim.tokenizer.base import AbstractAnalyzer
19 from nominatim.db.async_connection import DBConnection
20 from nominatim.typing import Query, DictCursorResult, DictCursorResults, Protocol
21
22 # pylint: disable=C0111
23
24 def _mk_valuelist(template: str, num: int) -> pysql.Composed:
25     return pysql.SQL(',').join([pysql.SQL(template)] * num)
26
27 def _analyze_place(place: DictCursorResult, analyzer: AbstractAnalyzer) -> psycopg2.extras.Json:
28     return psycopg2.extras.Json(analyzer.process_place(PlaceInfo(place)))
29
30
31 class Runner(Protocol):
32     def name(self) -> str: ...
33     def sql_count_objects(self) -> Query: ...
34     def sql_get_objects(self) -> Query: ...
35     def get_place_details(self, worker: DBConnection,
36                           ids: DictCursorResults) -> DictCursorResults: ...
37     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: ...
38
39
40 class AbstractPlacexRunner:
41     """ Returns SQL commands for indexing of the placex table.
42     """
43     SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ')
44     UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
45
46     def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None:
47         self.rank = rank
48         self.analyzer = analyzer
49
50
51     @functools.lru_cache(maxsize=1)
52     def _index_sql(self, num_places: int) -> pysql.Composed:
53         return pysql.SQL(
54             """ UPDATE placex
55                 SET indexed_status = 0, address = v.addr, token_info = v.ti,
56                     name = v.name, linked_place_id = v.linked_place_id
57                 FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti)
58                 WHERE place_id = v.id
59             """).format(_mk_valuelist(AbstractPlacexRunner.UPDATE_LINE, num_places))
60
61
62     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
63         worker.perform("""SELECT place_id, extra.*
64                           FROM placex, LATERAL placex_indexing_prepare(placex) as extra
65                           WHERE place_id IN %s""",
66                        (tuple((p[0] for p in ids)), ))
67
68         return []
69
70
71     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
72         values: List[Any] = []
73         for place in places:
74             for field in ('place_id', 'name', 'address', 'linked_place_id'):
75                 values.append(place[field])
76             values.append(_analyze_place(place, self.analyzer))
77
78         worker.perform(self._index_sql(len(places)), values)
79
80
81 class RankRunner(AbstractPlacexRunner):
82     """ Returns SQL commands for indexing one rank within the placex table.
83     """
84
85     def name(self) -> str:
86         return f"rank {self.rank}"
87
88     def sql_count_objects(self) -> pysql.Composed:
89         return pysql.SQL("""SELECT count(*) FROM placex
90                             WHERE rank_address = {} and indexed_status > 0
91                          """).format(pysql.Literal(self.rank))
92
93     def sql_get_objects(self) -> pysql.Composed:
94         return self.SELECT_SQL + pysql.SQL(
95             """WHERE indexed_status > 0 and rank_address = {}
96                ORDER BY geometry_sector
97             """).format(pysql.Literal(self.rank))
98
99
100 class BoundaryRunner(AbstractPlacexRunner):
101     """ Returns SQL commands for indexing the administrative boundaries
102         of a certain rank.
103     """
104
105     def name(self) -> str:
106         return f"boundaries rank {self.rank}"
107
108     def sql_count_objects(self) -> pysql.Composed:
109         return pysql.SQL("""SELECT count(*) FROM placex
110                             WHERE indexed_status > 0
111                               AND rank_search = {}
112                               AND class = 'boundary' and type = 'administrative'
113                          """).format(pysql.Literal(self.rank))
114
115     def sql_get_objects(self) -> pysql.Composed:
116         return self.SELECT_SQL + pysql.SQL(
117             """WHERE indexed_status > 0 and rank_search = {}
118                      and class = 'boundary' and type = 'administrative'
119                ORDER BY partition, admin_level
120             """).format(pysql.Literal(self.rank))
121
122
123 class InterpolationRunner:
124     """ Returns SQL commands for indexing the address interpolation table
125         location_property_osmline.
126     """
127
128     def __init__(self, analyzer: AbstractAnalyzer) -> None:
129         self.analyzer = analyzer
130
131
132     def name(self) -> str:
133         return "interpolation lines (location_property_osmline)"
134
135     def sql_count_objects(self) -> str:
136         return """SELECT count(*) FROM location_property_osmline
137                   WHERE indexed_status > 0"""
138
139     def sql_get_objects(self) -> str:
140         return """SELECT place_id
141                   FROM location_property_osmline
142                   WHERE indexed_status > 0
143                   ORDER BY geometry_sector"""
144
145
146     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
147         worker.perform("""SELECT place_id, get_interpolation_address(address, osm_id) as address
148                           FROM location_property_osmline WHERE place_id IN %s""",
149                        (tuple((p[0] for p in ids)), ))
150         return []
151
152
153     @functools.lru_cache(maxsize=1)
154     def _index_sql(self, num_places: int) -> pysql.Composed:
155         return pysql.SQL("""UPDATE location_property_osmline
156                             SET indexed_status = 0, address = v.addr, token_info = v.ti
157                             FROM (VALUES {}) as v(id, addr, ti)
158                             WHERE place_id = v.id
159                          """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
160
161
162     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
163         values: List[Any] = []
164         for place in places:
165             values.extend((place[x] for x in ('place_id', 'address')))
166             values.append(_analyze_place(place, self.analyzer))
167
168         worker.perform(self._index_sql(len(places)), values)
169
170
171
172 class PostcodeRunner(Runner):
173     """ Provides the SQL commands for indexing the location_postcode table.
174     """
175
176     def name(self) -> str:
177         return "postcodes (location_postcode)"
178
179
180     def sql_count_objects(self) -> str:
181         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
182
183
184     def sql_get_objects(self) -> str:
185         return """SELECT place_id FROM location_postcode
186                   WHERE indexed_status > 0
187                   ORDER BY country_code, postcode"""
188
189
190     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
191         return ids
192
193     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
194         worker.perform(pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
195                                     WHERE place_id IN ({})""")
196                        .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in places))))