]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/runners.py
add type annotations for command line functions
[nominatim.git] / nominatim / indexer / runners.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Mix-ins that provide the actual commands for the indexer for various indexing
9 tasks.
10 """
11 from typing import Any, List
12 import functools
13
14 from typing_extensions import Protocol
15 from psycopg2 import sql as pysql
16 import psycopg2.extras
17
18 from nominatim.data.place_info import PlaceInfo
19 from nominatim.tokenizer.base import AbstractAnalyzer
20 from nominatim.db.async_connection import DBConnection
21 from nominatim.typing import Query, DictCursorResult, DictCursorResults
22
23 # pylint: disable=C0111
24
25 def _mk_valuelist(template: str, num: int) -> pysql.Composed:
26     return pysql.SQL(',').join([pysql.SQL(template)] * num)
27
28 def _analyze_place(place: DictCursorResult, analyzer: AbstractAnalyzer) -> psycopg2.extras.Json:
29     return psycopg2.extras.Json(analyzer.process_place(PlaceInfo(place)))
30
31
32 class Runner(Protocol):
33     def name(self) -> str: ...
34     def sql_count_objects(self) -> Query: ...
35     def sql_get_objects(self) -> Query: ...
36     def get_place_details(self, worker: DBConnection,
37                           ids: DictCursorResults) -> DictCursorResults: ...
38     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: ...
39
40
41 class AbstractPlacexRunner:
42     """ Returns SQL commands for indexing of the placex table.
43     """
44     SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ')
45     UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
46
47     def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None:
48         self.rank = rank
49         self.analyzer = analyzer
50
51
52     @functools.lru_cache(maxsize=1)
53     def _index_sql(self, num_places: int) -> pysql.Composed:
54         return pysql.SQL(
55             """ UPDATE placex
56                 SET indexed_status = 0, address = v.addr, token_info = v.ti,
57                     name = v.name, linked_place_id = v.linked_place_id
58                 FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti)
59                 WHERE place_id = v.id
60             """).format(_mk_valuelist(AbstractPlacexRunner.UPDATE_LINE, num_places))
61
62
63     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
64         worker.perform("""SELECT place_id, extra.*
65                           FROM placex, LATERAL placex_indexing_prepare(placex) as extra
66                           WHERE place_id IN %s""",
67                        (tuple((p[0] for p in ids)), ))
68
69         return []
70
71
72     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
73         values: List[Any] = []
74         for place in places:
75             for field in ('place_id', 'name', 'address', 'linked_place_id'):
76                 values.append(place[field])
77             values.append(_analyze_place(place, self.analyzer))
78
79         worker.perform(self._index_sql(len(places)), values)
80
81
82 class RankRunner(AbstractPlacexRunner):
83     """ Returns SQL commands for indexing one rank within the placex table.
84     """
85
86     def name(self) -> str:
87         return f"rank {self.rank}"
88
89     def sql_count_objects(self) -> pysql.Composed:
90         return pysql.SQL("""SELECT count(*) FROM placex
91                             WHERE rank_address = {} and indexed_status > 0
92                          """).format(pysql.Literal(self.rank))
93
94     def sql_get_objects(self) -> pysql.Composed:
95         return self.SELECT_SQL + pysql.SQL(
96             """WHERE indexed_status > 0 and rank_address = {}
97                ORDER BY geometry_sector
98             """).format(pysql.Literal(self.rank))
99
100
101 class BoundaryRunner(AbstractPlacexRunner):
102     """ Returns SQL commands for indexing the administrative boundaries
103         of a certain rank.
104     """
105
106     def name(self) -> str:
107         return f"boundaries rank {self.rank}"
108
109     def sql_count_objects(self) -> pysql.Composed:
110         return pysql.SQL("""SELECT count(*) FROM placex
111                             WHERE indexed_status > 0
112                               AND rank_search = {}
113                               AND class = 'boundary' and type = 'administrative'
114                          """).format(pysql.Literal(self.rank))
115
116     def sql_get_objects(self) -> pysql.Composed:
117         return self.SELECT_SQL + pysql.SQL(
118             """WHERE indexed_status > 0 and rank_search = {}
119                      and class = 'boundary' and type = 'administrative'
120                ORDER BY partition, admin_level
121             """).format(pysql.Literal(self.rank))
122
123
124 class InterpolationRunner:
125     """ Returns SQL commands for indexing the address interpolation table
126         location_property_osmline.
127     """
128
129     def __init__(self, analyzer: AbstractAnalyzer) -> None:
130         self.analyzer = analyzer
131
132
133     def name(self) -> str:
134         return "interpolation lines (location_property_osmline)"
135
136     def sql_count_objects(self) -> str:
137         return """SELECT count(*) FROM location_property_osmline
138                   WHERE indexed_status > 0"""
139
140     def sql_get_objects(self) -> str:
141         return """SELECT place_id
142                   FROM location_property_osmline
143                   WHERE indexed_status > 0
144                   ORDER BY geometry_sector"""
145
146
147     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
148         worker.perform("""SELECT place_id, get_interpolation_address(address, osm_id) as address
149                           FROM location_property_osmline WHERE place_id IN %s""",
150                        (tuple((p[0] for p in ids)), ))
151         return []
152
153
154     @functools.lru_cache(maxsize=1)
155     def _index_sql(self, num_places: int) -> pysql.Composed:
156         return pysql.SQL("""UPDATE location_property_osmline
157                             SET indexed_status = 0, address = v.addr, token_info = v.ti
158                             FROM (VALUES {}) as v(id, addr, ti)
159                             WHERE place_id = v.id
160                          """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
161
162
163     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
164         values: List[Any] = []
165         for place in places:
166             values.extend((place[x] for x in ('place_id', 'address')))
167             values.append(_analyze_place(place, self.analyzer))
168
169         worker.perform(self._index_sql(len(places)), values)
170
171
172
173 class PostcodeRunner(Runner):
174     """ Provides the SQL commands for indexing the location_postcode table.
175     """
176
177     def name(self) -> str:
178         return "postcodes (location_postcode)"
179
180
181     def sql_count_objects(self) -> str:
182         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
183
184
185     def sql_get_objects(self) -> str:
186         return """SELECT place_id FROM location_postcode
187                   WHERE indexed_status > 0
188                   ORDER BY country_code, postcode"""
189
190
191     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
192         return ids
193
194     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
195         worker.perform(pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
196                                     WHERE place_id IN ({})""")
197                        .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in places))))