]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/indexer/runners.py
Merge pull request #3487 from lonvia/port-to-psycopg3
[nominatim.git] / src / nominatim_db / indexer / runners.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Mix-ins that provide the actual commands for the indexer for various indexing
9 tasks.
10 """
11 from typing import Any, Sequence
12
13 from psycopg import sql as pysql
14 from psycopg.abc import Query
15 from psycopg.rows import DictRow
16 from psycopg.types.json import Json
17
18 from ..typing import Protocol
19 from ..data.place_info import PlaceInfo
20 from ..tokenizer.base import AbstractAnalyzer
21
22 # pylint: disable=C0111
23
24 def _mk_valuelist(template: str, num: int) -> pysql.Composed:
25     return pysql.SQL(',').join([pysql.SQL(template)] * num)
26
27 def _analyze_place(place: DictRow, analyzer: AbstractAnalyzer) -> Json:
28     return Json(analyzer.process_place(PlaceInfo(place)))
29
30
31 class Runner(Protocol):
32     def name(self) -> str: ...
33     def sql_count_objects(self) -> Query: ...
34     def sql_get_objects(self) -> Query: ...
35     def index_places_query(self, batch_size: int) -> Query: ...
36     def index_places_params(self, place: DictRow) -> Sequence[Any]: ...
37
38
39 SELECT_SQL = pysql.SQL("""SELECT place_id, extra.*
40                           FROM (SELECT * FROM placex {}) as px,
41                           LATERAL placex_indexing_prepare(px) as extra """)
42 UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
43
44 class AbstractPlacexRunner:
45     """ Returns SQL commands for indexing of the placex table.
46     """
47
48     def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None:
49         self.rank = rank
50         self.analyzer = analyzer
51
52
53     def index_places_query(self, batch_size: int) -> Query:
54         return pysql.SQL(
55             """ UPDATE placex
56                 SET indexed_status = 0, address = v.addr, token_info = v.ti,
57                     name = v.name, linked_place_id = v.linked_place_id
58                 FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti)
59                 WHERE place_id = v.id
60             """).format(_mk_valuelist(UPDATE_LINE, batch_size))
61
62
63     def index_places_params(self, place: DictRow) -> Sequence[Any]:
64         return (place['place_id'],
65                 place['name'],
66                 place['address'],
67                 place['linked_place_id'],
68                 _analyze_place(place, self.analyzer))
69
70
71 class RankRunner(AbstractPlacexRunner):
72     """ Returns SQL commands for indexing one rank within the placex table.
73     """
74
75     def name(self) -> str:
76         return f"rank {self.rank}"
77
78     def sql_count_objects(self) -> pysql.Composed:
79         return pysql.SQL("""SELECT count(*) FROM placex
80                             WHERE rank_address = {} and indexed_status > 0
81                          """).format(pysql.Literal(self.rank))
82
83     def sql_get_objects(self) -> pysql.Composed:
84         return SELECT_SQL.format(pysql.SQL(
85                 """WHERE placex.indexed_status > 0 and placex.rank_address = {}
86                    ORDER BY placex.geometry_sector
87                 """).format(pysql.Literal(self.rank)))
88
89
90 class BoundaryRunner(AbstractPlacexRunner):
91     """ Returns SQL commands for indexing the administrative boundaries
92         of a certain rank.
93     """
94
95     def name(self) -> str:
96         return f"boundaries rank {self.rank}"
97
98     def sql_count_objects(self) -> Query:
99         return pysql.SQL("""SELECT count(*) FROM placex
100                             WHERE indexed_status > 0
101                               AND rank_search = {}
102                               AND class = 'boundary' and type = 'administrative'
103                          """).format(pysql.Literal(self.rank))
104
105     def sql_get_objects(self) -> Query:
106         return SELECT_SQL.format(pysql.SQL(
107                 """WHERE placex.indexed_status > 0 and placex.rank_search = {}
108                          and placex.class = 'boundary' and placex.type = 'administrative'
109                    ORDER BY placex.partition, placex.admin_level
110                 """).format(pysql.Literal(self.rank)))
111
112
113 class InterpolationRunner:
114     """ Returns SQL commands for indexing the address interpolation table
115         location_property_osmline.
116     """
117
118     def __init__(self, analyzer: AbstractAnalyzer) -> None:
119         self.analyzer = analyzer
120
121
122     def name(self) -> str:
123         return "interpolation lines (location_property_osmline)"
124
125     def sql_count_objects(self) -> Query:
126         return """SELECT count(*) FROM location_property_osmline
127                   WHERE indexed_status > 0"""
128
129
130     def sql_get_objects(self) -> Query:
131         return """SELECT place_id, get_interpolation_address(address, osm_id) as address
132                   FROM location_property_osmline
133                   WHERE indexed_status > 0
134                   ORDER BY geometry_sector"""
135
136
137     def index_places_query(self, batch_size: int) -> Query:
138         return pysql.SQL("""UPDATE location_property_osmline
139                             SET indexed_status = 0, address = v.addr, token_info = v.ti
140                             FROM (VALUES {}) as v(id, addr, ti)
141                             WHERE place_id = v.id
142                          """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", batch_size))
143
144
145     def index_places_params(self, place: DictRow) -> Sequence[Any]:
146         return (place['place_id'], place['address'],
147                 _analyze_place(place, self.analyzer))
148
149
150
151 class PostcodeRunner(Runner):
152     """ Provides the SQL commands for indexing the location_postcode table.
153     """
154
155     def name(self) -> str:
156         return "postcodes (location_postcode)"
157
158
159     def sql_count_objects(self) -> Query:
160         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
161
162
163     def sql_get_objects(self) -> Query:
164         return """SELECT place_id FROM location_postcode
165                   WHERE indexed_status > 0
166                   ORDER BY country_code, postcode"""
167
168
169     def index_places_query(self, batch_size: int) -> Query:
170         return pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
171                                     WHERE place_id IN ({})""")\
172                     .format(pysql.SQL(',').join((pysql.Placeholder() for _ in range(batch_size))))
173
174
175     def index_places_params(self, place: DictRow) -> Sequence[Any]:
176         return (place['place_id'], )