]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/indexer/indexer.py
port code to psycopg3
[nominatim.git] / src / nominatim_db / indexer / indexer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Main work horse for indexing (computing addresses) the database.
9 """
10 from typing import cast, List, Any
11 import logging
12 import time
13
14 import psycopg
15
16 from ..db.connection import connect, execute_scalar
17 from ..db.query_pool import QueryPool
18 from ..tokenizer.base import AbstractTokenizer
19 from .progress import ProgressLogger
20 from . import runners
21
22 LOG = logging.getLogger()
23
24 class Indexer:
25     """ Main indexing routine.
26     """
27
28     def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
29         self.dsn = dsn
30         self.tokenizer = tokenizer
31         self.num_threads = num_threads
32
33
34     def has_pending(self) -> bool:
35         """ Check if any data still needs indexing.
36             This function must only be used after the import has finished.
37             Otherwise it will be very expensive.
38         """
39         with connect(self.dsn) as conn:
40             with conn.cursor() as cur:
41                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
42                 return cur.rowcount > 0
43
44
45     async def index_full(self, analyse: bool = True) -> None:
46         """ Index the complete database. This will first index boundaries
47             followed by all other objects. When `analyse` is True, then the
48             database will be analysed at the appropriate places to
49             ensure that database statistics are updated.
50         """
51         with connect(self.dsn) as conn:
52             conn.autocommit = True
53
54             def _analyze() -> None:
55                 if analyse:
56                     with conn.cursor() as cur:
57                         cur.execute('ANALYZE')
58
59             while True:
60                 if await self.index_by_rank(0, 4) > 0:
61                     _analyze()
62
63                 if await self.index_boundaries(0, 30) > 100:
64                     _analyze()
65
66                 if await self.index_by_rank(5, 25) > 100:
67                     _analyze()
68
69                 if await self.index_by_rank(26, 30) > 1000:
70                     _analyze()
71
72                 if await self.index_postcodes() > 100:
73                     _analyze()
74
75                 if not self.has_pending():
76                     break
77
78
79     async def index_boundaries(self, minrank: int, maxrank: int) -> int:
80         """ Index only administrative boundaries within the given rank range.
81         """
82         total = 0
83         LOG.warning("Starting indexing boundaries using %s threads",
84                     self.num_threads)
85
86         with self.tokenizer.name_analyzer() as analyzer:
87             for rank in range(max(minrank, 4), min(maxrank, 26)):
88                 total += await self._index(runners.BoundaryRunner(rank, analyzer))
89
90         return total
91
92     async def index_by_rank(self, minrank: int, maxrank: int) -> int:
93         """ Index all entries of placex in the given rank range (inclusive)
94             in order of their address rank.
95
96             When rank 30 is requested then also interpolations and
97             places with address rank 0 will be indexed.
98         """
99         total = 0
100         maxrank = min(maxrank, 30)
101         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
102                     minrank, maxrank, self.num_threads)
103
104         with self.tokenizer.name_analyzer() as analyzer:
105             for rank in range(max(1, minrank), maxrank + 1):
106                 if rank >= 30:
107                     batch = 20
108                 elif rank >= 26:
109                     batch = 5
110                 else:
111                     batch = 1
112                 total += await self._index(runners.RankRunner(rank, analyzer), batch)
113
114             if maxrank == 30:
115                 total += await self._index(runners.RankRunner(0, analyzer))
116                 total += await self._index(runners.InterpolationRunner(analyzer), 20)
117
118         return total
119
120
121     async def index_postcodes(self) -> int:
122         """Index the entries of the location_postcode table.
123         """
124         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
125
126         return await self._index(runners.PostcodeRunner(), 20)
127
128
129     def update_status_table(self) -> None:
130         """ Update the status in the status table to 'indexed'.
131         """
132         with connect(self.dsn) as conn:
133             with conn.cursor() as cur:
134                 cur.execute('UPDATE import_status SET indexed = true')
135
136             conn.commit()
137
138     async def _index(self, runner: runners.Runner, batch: int = 1) -> int:
139         """ Index a single rank or table. `runner` describes the SQL to use
140             for indexing. `batch` describes the number of objects that
141             should be processed with a single SQL statement
142         """
143         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
144
145         total_tuples = self._prepare_indexing(runner)
146
147         progress = ProgressLogger(runner.name(), total_tuples)
148
149         if total_tuples > 0:
150             async with await psycopg.AsyncConnection.connect(
151                                  self.dsn, row_factory=psycopg.rows.dict_row) as aconn,\
152                        QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
153                 fetcher_time = 0.0
154                 tstart = time.time()
155                 async with aconn.cursor(name='places') as cur:
156                     query = runner.index_places_query(batch)
157                     params: List[Any] = []
158                     num_places = 0
159                     async for place in cur.stream(runner.sql_get_objects()):
160                         fetcher_time += time.time() - tstart
161
162                         params.extend(runner.index_places_params(place))
163                         num_places += 1
164
165                         if num_places >= batch:
166                             LOG.debug("Processing places: %s", str(params))
167                             await pool.put_query(query, params)
168                             progress.add(num_places)
169                             params = []
170                             num_places = 0
171
172                         tstart = time.time()
173
174                 if num_places > 0:
175                     await pool.put_query(runner.index_places_query(num_places), params)
176
177             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
178                      fetcher_time, pool.wait_time)
179
180         return progress.done()
181
182
183     def _prepare_indexing(self, runner: runners.Runner) -> int:
184         with connect(self.dsn) as conn:
185             hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
186             if hstore_info is None:
187                 raise RuntimeError('Hstore extension is requested but not installed.')
188             psycopg.types.hstore.register_hstore(hstore_info)
189
190             total_tuples = execute_scalar(conn, runner.sql_count_objects())
191             LOG.debug("Total number of rows: %i", total_tuples)
192         return cast(int, total_tuples)