]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/indexer/indexer.py
b4c9732c624231df40215eb9e6ba543554c332e4
[nominatim.git] / src / nominatim_db / indexer / indexer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Main work horse for indexing (computing addresses) the database.
9 """
10 from typing import Optional, Any, cast
11 import logging
12 import time
13
14 import psycopg2.extras
15
16 from ..typing import DictCursorResults
17 from ..db.async_connection import DBConnection, WorkerPool
18 from ..db.connection import connect, Connection, Cursor, execute_scalar, register_hstore
19 from ..tokenizer.base import AbstractTokenizer
20 from .progress import ProgressLogger
21 from . import runners
22
23 LOG = logging.getLogger()
24
25
26 class PlaceFetcher:
27     """ Asynchronous connection that fetches place details for processing.
28     """
29     def __init__(self, dsn: str, setup_conn: Connection) -> None:
30         self.wait_time = 0.0
31         self.current_ids: Optional[DictCursorResults] = None
32         self.conn: Optional[DBConnection] = DBConnection(dsn,
33                                                cursor_factory=psycopg2.extras.DictCursor)
34
35         # need to fetch those manually because register_hstore cannot
36         # fetch them on an asynchronous connection below.
37         hstore_oid = execute_scalar(setup_conn, "SELECT 'hstore'::regtype::oid")
38         hstore_array_oid = execute_scalar(setup_conn, "SELECT 'hstore[]'::regtype::oid")
39
40         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
41                                         array_oid=hstore_array_oid)
42
43
44     def close(self) -> None:
45         """ Close the underlying asynchronous connection.
46         """
47         if self.conn:
48             self.conn.close()
49             self.conn = None
50
51
52     def fetch_next_batch(self, cur: Cursor, runner: runners.Runner) -> bool:
53         """ Send a request for the next batch of places.
54             If details for the places are required, they will be fetched
55             asynchronously.
56
57             Returns true if there is still data available.
58         """
59         ids = cast(Optional[DictCursorResults], cur.fetchmany(100))
60
61         if not ids:
62             self.current_ids = None
63             return False
64
65         assert self.conn is not None
66         self.current_ids = runner.get_place_details(self.conn, ids)
67
68         return True
69
70     def get_batch(self) -> DictCursorResults:
71         """ Get the next batch of data, previously requested with
72             `fetch_next_batch`.
73         """
74         assert self.conn is not None
75         assert self.conn.cursor is not None
76
77         if self.current_ids is not None and not self.current_ids:
78             tstart = time.time()
79             self.conn.wait()
80             self.wait_time += time.time() - tstart
81             self.current_ids = cast(Optional[DictCursorResults],
82                                     self.conn.cursor.fetchall())
83
84         return self.current_ids if self.current_ids is not None else []
85
86     def __enter__(self) -> 'PlaceFetcher':
87         return self
88
89
90     def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
91         assert self.conn is not None
92         self.conn.wait()
93         self.close()
94
95
96 class Indexer:
97     """ Main indexing routine.
98     """
99
100     def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
101         self.dsn = dsn
102         self.tokenizer = tokenizer
103         self.num_threads = num_threads
104
105
106     def has_pending(self) -> bool:
107         """ Check if any data still needs indexing.
108             This function must only be used after the import has finished.
109             Otherwise it will be very expensive.
110         """
111         with connect(self.dsn) as conn:
112             with conn.cursor() as cur:
113                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
114                 return cur.rowcount > 0
115
116
117     def index_full(self, analyse: bool = True) -> None:
118         """ Index the complete database. This will first index boundaries
119             followed by all other objects. When `analyse` is True, then the
120             database will be analysed at the appropriate places to
121             ensure that database statistics are updated.
122         """
123         with connect(self.dsn) as conn:
124             conn.autocommit = True
125
126             def _analyze() -> None:
127                 if analyse:
128                     with conn.cursor() as cur:
129                         cur.execute('ANALYZE')
130
131             if self.index_by_rank(0, 4) > 0:
132                 _analyze()
133
134             if self.index_boundaries(0, 30) > 100:
135                 _analyze()
136
137             if self.index_by_rank(5, 25) > 100:
138                 _analyze()
139
140             if self.index_by_rank(26, 30) > 1000:
141                 _analyze()
142
143             if self.index_postcodes() > 100:
144                 _analyze()
145
146
147     def index_boundaries(self, minrank: int, maxrank: int) -> int:
148         """ Index only administrative boundaries within the given rank range.
149         """
150         total = 0
151         LOG.warning("Starting indexing boundaries using %s threads",
152                     self.num_threads)
153
154         with self.tokenizer.name_analyzer() as analyzer:
155             for rank in range(max(minrank, 4), min(maxrank, 26)):
156                 total += self._index(runners.BoundaryRunner(rank, analyzer))
157
158         return total
159
160     def index_by_rank(self, minrank: int, maxrank: int) -> int:
161         """ Index all entries of placex in the given rank range (inclusive)
162             in order of their address rank.
163
164             When rank 30 is requested then also interpolations and
165             places with address rank 0 will be indexed.
166         """
167         total = 0
168         maxrank = min(maxrank, 30)
169         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
170                     minrank, maxrank, self.num_threads)
171
172         with self.tokenizer.name_analyzer() as analyzer:
173             for rank in range(max(1, minrank), maxrank + 1):
174                 total += self._index(runners.RankRunner(rank, analyzer), 20 if rank == 30 else 1)
175
176             if maxrank == 30:
177                 total += self._index(runners.RankRunner(0, analyzer))
178                 total += self._index(runners.InterpolationRunner(analyzer), 20)
179
180         return total
181
182
183     def index_postcodes(self) -> int:
184         """Index the entries of the location_postcode table.
185         """
186         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
187
188         return self._index(runners.PostcodeRunner(), 20)
189
190
191     def update_status_table(self) -> None:
192         """ Update the status in the status table to 'indexed'.
193         """
194         with connect(self.dsn) as conn:
195             with conn.cursor() as cur:
196                 cur.execute('UPDATE import_status SET indexed = true')
197
198             conn.commit()
199
200     def _index(self, runner: runners.Runner, batch: int = 1) -> int:
201         """ Index a single rank or table. `runner` describes the SQL to use
202             for indexing. `batch` describes the number of objects that
203             should be processed with a single SQL statement
204         """
205         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
206
207         with connect(self.dsn) as conn:
208             register_hstore(conn)
209             total_tuples = execute_scalar(conn, runner.sql_count_objects())
210             LOG.debug("Total number of rows: %i", total_tuples)
211
212             conn.commit()
213
214             progress = ProgressLogger(runner.name(), total_tuples)
215
216             if total_tuples > 0:
217                 with conn.cursor(name='places') as cur:
218                     cur.execute(runner.sql_get_objects())
219
220                     with PlaceFetcher(self.dsn, conn) as fetcher:
221                         with WorkerPool(self.dsn, self.num_threads) as pool:
222                             has_more = fetcher.fetch_next_batch(cur, runner)
223                             while has_more:
224                                 places = fetcher.get_batch()
225
226                                 # asynchronously get the next batch
227                                 has_more = fetcher.fetch_next_batch(cur, runner)
228
229                                 # And insert the current batch
230                                 for idx in range(0, len(places), batch):
231                                     part = places[idx:idx + batch]
232                                     LOG.debug("Processing places: %s", str(part))
233                                     runner.index_places(pool.next_free_worker(), part)
234                                     progress.add(len(part))
235
236                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
237                                      fetcher.wait_time, pool.wait_time)
238
239                 conn.commit()
240
241         return progress.done()