]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/indexer/indexer.py
fix style issue found by flake8
[nominatim.git] / src / nominatim_db / indexer / indexer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Main work horse for indexing (computing addresses) the database.
9 """
10 from typing import cast, List, Any, Optional
11 import logging
12 import time
13
14 import psycopg
15
16 from ..db.connection import connect, execute_scalar
17 from ..db.query_pool import QueryPool
18 from ..tokenizer.base import AbstractTokenizer
19 from .progress import ProgressLogger
20 from . import runners
21
22 LOG = logging.getLogger()
23
24
25 class Indexer:
26     """ Main indexing routine.
27     """
28
29     def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
30         self.dsn = dsn
31         self.tokenizer = tokenizer
32         self.num_threads = num_threads
33
34     def has_pending(self) -> bool:
35         """ Check if any data still needs indexing.
36             This function must only be used after the import has finished.
37             Otherwise it will be very expensive.
38         """
39         with connect(self.dsn) as conn:
40             with conn.cursor() as cur:
41                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
42                 return cur.rowcount > 0
43
44     async def index_full(self, analyse: bool = True) -> None:
45         """ Index the complete database. This will first index boundaries
46             followed by all other objects. When `analyse` is True, then the
47             database will be analysed at the appropriate places to
48             ensure that database statistics are updated.
49         """
50         with connect(self.dsn) as conn:
51             conn.autocommit = True
52
53             def _analyze() -> None:
54                 if analyse:
55                     with conn.cursor() as cur:
56                         cur.execute('ANALYZE')
57
58             while True:
59                 if await self.index_by_rank(0, 4) > 0:
60                     _analyze()
61
62                 if await self.index_boundaries(0, 30) > 100:
63                     _analyze()
64
65                 if await self.index_by_rank(5, 25) > 100:
66                     _analyze()
67
68                 if await self.index_by_rank(26, 30) > 1000:
69                     _analyze()
70
71                 if await self.index_postcodes() > 100:
72                     _analyze()
73
74                 if not self.has_pending():
75                     break
76
77     async def index_boundaries(self, minrank: int, maxrank: int) -> int:
78         """ Index only administrative boundaries within the given rank range.
79         """
80         total = 0
81         LOG.warning("Starting indexing boundaries using %s threads",
82                     self.num_threads)
83
84         minrank = max(minrank, 4)
85         maxrank = min(maxrank, 25)
86
87         # Precompute number of rows to process for all rows
88         with connect(self.dsn) as conn:
89             hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
90             if hstore_info is None:
91                 raise RuntimeError('Hstore extension is requested but not installed.')
92             psycopg.types.hstore.register_hstore(hstore_info)
93
94             with conn.cursor() as cur:
95                 cur = conn.execute(""" SELECT rank_search, count(*)
96                                        FROM placex
97                                        WHERE rank_search between %s and %s
98                                              AND class = 'boundary' and type = 'administrative'
99                                              AND indexed_status > 0
100                                        GROUP BY rank_search""",
101                                    (minrank, maxrank))
102                 total_tuples = {row.rank_search: row.count for row in cur}
103
104         with self.tokenizer.name_analyzer() as analyzer:
105             for rank in range(minrank, maxrank + 1):
106                 total += await self._index(runners.BoundaryRunner(rank, analyzer),
107                                            total_tuples=total_tuples.get(rank, 0))
108
109         return total
110
111     async def index_by_rank(self, minrank: int, maxrank: int) -> int:
112         """ Index all entries of placex in the given rank range (inclusive)
113             in order of their address rank.
114
115             When rank 30 is requested then also interpolations and
116             places with address rank 0 will be indexed.
117         """
118         total = 0
119         maxrank = min(maxrank, 30)
120         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
121                     minrank, maxrank, self.num_threads)
122
123         # Precompute number of rows to process for all rows
124         with connect(self.dsn) as conn:
125             hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
126             if hstore_info is None:
127                 raise RuntimeError('Hstore extension is requested but not installed.')
128             psycopg.types.hstore.register_hstore(hstore_info)
129
130             with conn.cursor() as cur:
131                 cur = conn.execute(""" SELECT rank_address, count(*)
132                                        FROM placex
133                                        WHERE rank_address between %s and %s
134                                              AND indexed_status > 0
135                                        GROUP BY rank_address""",
136                                    (minrank, maxrank))
137                 total_tuples = {row.rank_address: row.count for row in cur}
138
139         with self.tokenizer.name_analyzer() as analyzer:
140             for rank in range(max(1, minrank), maxrank + 1):
141                 if rank >= 30:
142                     batch = 20
143                 elif rank >= 26:
144                     batch = 5
145                 else:
146                     batch = 1
147                 total += await self._index(runners.RankRunner(rank, analyzer),
148                                            batch=batch, total_tuples=total_tuples.get(rank, 0))
149
150             if maxrank == 30:
151                 total += await self._index(runners.RankRunner(0, analyzer))
152                 total += await self._index(runners.InterpolationRunner(analyzer), batch=20)
153
154         return total
155
156     async def index_postcodes(self) -> int:
157         """Index the entries of the location_postcode table.
158         """
159         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
160
161         return await self._index(runners.PostcodeRunner(), batch=20)
162
163     def update_status_table(self) -> None:
164         """ Update the status in the status table to 'indexed'.
165         """
166         with connect(self.dsn) as conn:
167             with conn.cursor() as cur:
168                 cur.execute('UPDATE import_status SET indexed = true')
169
170             conn.commit()
171
172     async def _index(self, runner: runners.Runner, batch: int = 1,
173                      total_tuples: Optional[int] = None) -> int:
174         """ Index a single rank or table. `runner` describes the SQL to use
175             for indexing. `batch` describes the number of objects that
176             should be processed with a single SQL statement.
177
178             `total_tuples` may contain the total number of rows to process.
179             When not supplied, the value will be computed using the
180             approriate runner function.
181         """
182         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
183
184         if total_tuples is None:
185             total_tuples = self._prepare_indexing(runner)
186
187         progress = ProgressLogger(runner.name(), total_tuples)
188
189         if total_tuples > 0:
190             async with await psycopg.AsyncConnection.connect(
191                                  self.dsn, row_factory=psycopg.rows.dict_row) as aconn, \
192                        QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
193                 fetcher_time = 0.0
194                 tstart = time.time()
195                 async with aconn.cursor(name='places') as cur:
196                     query = runner.index_places_query(batch)
197                     params: List[Any] = []
198                     num_places = 0
199                     async for place in cur.stream(runner.sql_get_objects()):
200                         fetcher_time += time.time() - tstart
201
202                         params.extend(runner.index_places_params(place))
203                         num_places += 1
204
205                         if num_places >= batch:
206                             LOG.debug("Processing places: %s", str(params))
207                             await pool.put_query(query, params)
208                             progress.add(num_places)
209                             params = []
210                             num_places = 0
211
212                         tstart = time.time()
213
214                 if num_places > 0:
215                     await pool.put_query(runner.index_places_query(num_places), params)
216
217             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
218                      fetcher_time, pool.wait_time)
219
220         return progress.done()
221
222     def _prepare_indexing(self, runner: runners.Runner) -> int:
223         with connect(self.dsn) as conn:
224             hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
225             if hstore_info is None:
226                 raise RuntimeError('Hstore extension is requested but not installed.')
227             psycopg.types.hstore.register_hstore(hstore_info)
228
229             total_tuples = execute_scalar(conn, runner.sql_count_objects())
230             LOG.debug("Total number of rows: %i", total_tuples)
231         return cast(int, total_tuples)