]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/indexer/indexer.py
Merge pull request #3517 from lonvia/improve-custom-formatter
[nominatim.git] / src / nominatim_db / indexer / indexer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Main work horse for indexing (computing addresses) the database.
9 """
10 from typing import cast, List, Any, Optional
11 import logging
12 import time
13
14 import psycopg
15
16 from ..db.connection import connect, execute_scalar
17 from ..db.query_pool import QueryPool
18 from ..tokenizer.base import AbstractTokenizer
19 from .progress import ProgressLogger
20 from . import runners
21
22 LOG = logging.getLogger()
23
24 class Indexer:
25     """ Main indexing routine.
26     """
27
28     def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
29         self.dsn = dsn
30         self.tokenizer = tokenizer
31         self.num_threads = num_threads
32
33
34     def has_pending(self) -> bool:
35         """ Check if any data still needs indexing.
36             This function must only be used after the import has finished.
37             Otherwise it will be very expensive.
38         """
39         with connect(self.dsn) as conn:
40             with conn.cursor() as cur:
41                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
42                 return cur.rowcount > 0
43
44
45     async def index_full(self, analyse: bool = True) -> None:
46         """ Index the complete database. This will first index boundaries
47             followed by all other objects. When `analyse` is True, then the
48             database will be analysed at the appropriate places to
49             ensure that database statistics are updated.
50         """
51         with connect(self.dsn) as conn:
52             conn.autocommit = True
53
54             def _analyze() -> None:
55                 if analyse:
56                     with conn.cursor() as cur:
57                         cur.execute('ANALYZE')
58
59             while True:
60                 if await self.index_by_rank(0, 4) > 0:
61                     _analyze()
62
63                 if await self.index_boundaries(0, 30) > 100:
64                     _analyze()
65
66                 if await self.index_by_rank(5, 25) > 100:
67                     _analyze()
68
69                 if await self.index_by_rank(26, 30) > 1000:
70                     _analyze()
71
72                 if await self.index_postcodes() > 100:
73                     _analyze()
74
75                 if not self.has_pending():
76                     break
77
78
79     async def index_boundaries(self, minrank: int, maxrank: int) -> int:
80         """ Index only administrative boundaries within the given rank range.
81         """
82         total = 0
83         LOG.warning("Starting indexing boundaries using %s threads",
84                     self.num_threads)
85
86         minrank = max(minrank, 4)
87         maxrank = min(maxrank, 25)
88
89         # Precompute number of rows to process for all rows
90         with connect(self.dsn) as conn:
91             hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
92             if hstore_info is None:
93                 raise RuntimeError('Hstore extension is requested but not installed.')
94             psycopg.types.hstore.register_hstore(hstore_info)
95
96             with conn.cursor() as cur:
97                 cur = conn.execute(""" SELECT rank_search, count(*)
98                                        FROM placex
99                                        WHERE rank_search between %s and %s
100                                              AND class = 'boundary' and type = 'administrative'
101                                              AND indexed_status > 0
102                                        GROUP BY rank_search""",
103                                    (minrank, maxrank))
104                 total_tuples = {row.rank_search: row.count for row in cur}
105
106         with self.tokenizer.name_analyzer() as analyzer:
107             for rank in range(minrank, maxrank + 1):
108                 total += await self._index(runners.BoundaryRunner(rank, analyzer),
109                                            total_tuples=total_tuples.get(rank, 0))
110
111         return total
112
113     async def index_by_rank(self, minrank: int, maxrank: int) -> int:
114         """ Index all entries of placex in the given rank range (inclusive)
115             in order of their address rank.
116
117             When rank 30 is requested then also interpolations and
118             places with address rank 0 will be indexed.
119         """
120         total = 0
121         maxrank = min(maxrank, 30)
122         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
123                     minrank, maxrank, self.num_threads)
124
125         # Precompute number of rows to process for all rows
126         with connect(self.dsn) as conn:
127             hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
128             if hstore_info is None:
129                 raise RuntimeError('Hstore extension is requested but not installed.')
130             psycopg.types.hstore.register_hstore(hstore_info)
131
132             with conn.cursor() as cur:
133                 cur = conn.execute(""" SELECT rank_address, count(*)
134                                        FROM placex
135                                        WHERE rank_address between %s and %s
136                                              AND indexed_status > 0
137                                        GROUP BY rank_address""",
138                                    (minrank, maxrank))
139                 total_tuples = {row.rank_address: row.count for row in cur}
140
141
142         with self.tokenizer.name_analyzer() as analyzer:
143             for rank in range(max(1, minrank), maxrank + 1):
144                 if rank >= 30:
145                     batch = 20
146                 elif rank >= 26:
147                     batch = 5
148                 else:
149                     batch = 1
150                 total += await self._index(runners.RankRunner(rank, analyzer),
151                                            batch=batch, total_tuples=total_tuples.get(rank, 0))
152
153             if maxrank == 30:
154                 total += await self._index(runners.RankRunner(0, analyzer))
155                 total += await self._index(runners.InterpolationRunner(analyzer), batch=20)
156
157         return total
158
159
160     async def index_postcodes(self) -> int:
161         """Index the entries of the location_postcode table.
162         """
163         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
164
165         return await self._index(runners.PostcodeRunner(), batch=20)
166
167
168     def update_status_table(self) -> None:
169         """ Update the status in the status table to 'indexed'.
170         """
171         with connect(self.dsn) as conn:
172             with conn.cursor() as cur:
173                 cur.execute('UPDATE import_status SET indexed = true')
174
175             conn.commit()
176
177     async def _index(self, runner: runners.Runner, batch: int = 1,
178                      total_tuples: Optional[int] = None) -> int:
179         """ Index a single rank or table. `runner` describes the SQL to use
180             for indexing. `batch` describes the number of objects that
181             should be processed with a single SQL statement.
182
183             `total_tuples` may contain the total number of rows to process.
184             When not supplied, the value will be computed using the
185             approriate runner function.
186         """
187         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
188
189         if total_tuples is None:
190             total_tuples = self._prepare_indexing(runner)
191
192         progress = ProgressLogger(runner.name(), total_tuples)
193
194         if total_tuples > 0:
195             async with await psycopg.AsyncConnection.connect(
196                                  self.dsn, row_factory=psycopg.rows.dict_row) as aconn,\
197                        QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
198                 fetcher_time = 0.0
199                 tstart = time.time()
200                 async with aconn.cursor(name='places') as cur:
201                     query = runner.index_places_query(batch)
202                     params: List[Any] = []
203                     num_places = 0
204                     async for place in cur.stream(runner.sql_get_objects()):
205                         fetcher_time += time.time() - tstart
206
207                         params.extend(runner.index_places_params(place))
208                         num_places += 1
209
210                         if num_places >= batch:
211                             LOG.debug("Processing places: %s", str(params))
212                             await pool.put_query(query, params)
213                             progress.add(num_places)
214                             params = []
215                             num_places = 0
216
217                         tstart = time.time()
218
219                 if num_places > 0:
220                     await pool.put_query(runner.index_places_query(num_places), params)
221
222             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
223                      fetcher_time, pool.wait_time)
224
225         return progress.done()
226
227
228     def _prepare_indexing(self, runner: runners.Runner) -> int:
229         with connect(self.dsn) as conn:
230             hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
231             if hstore_info is None:
232                 raise RuntimeError('Hstore extension is requested but not installed.')
233             psycopg.types.hstore.register_hstore(hstore_info)
234
235             total_tuples = execute_scalar(conn, runner.sql_count_objects())
236             LOG.debug("Total number of rows: %i", total_tuples)
237         return cast(int, total_tuples)