]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
Merge pull request #2770 from lonvia/typed-python
[nominatim.git] / nominatim / indexer / indexer.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Main work horse for indexing (computing addresses) the database.
9 """
10 from typing import Optional, Any, cast
11 import logging
12 import time
13
14 import psycopg2.extras
15
16 from nominatim.tokenizer.base import AbstractTokenizer
17 from nominatim.indexer.progress import ProgressLogger
18 from nominatim.indexer import runners
19 from nominatim.db.async_connection import DBConnection, WorkerPool
20 from nominatim.db.connection import connect, Connection, Cursor
21 from nominatim.typing import DictCursorResults
22
23 LOG = logging.getLogger()
24
25
26 class PlaceFetcher:
27     """ Asynchronous connection that fetches place details for processing.
28     """
29     def __init__(self, dsn: str, setup_conn: Connection) -> None:
30         self.wait_time = 0.0
31         self.current_ids: Optional[DictCursorResults] = None
32         self.conn: Optional[DBConnection] = DBConnection(dsn,
33                                                cursor_factory=psycopg2.extras.DictCursor)
34
35         with setup_conn.cursor() as cur:
36             # need to fetch those manually because register_hstore cannot
37             # fetch them on an asynchronous connection below.
38             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
39             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
40
41         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
42                                         array_oid=hstore_array_oid)
43
44     def close(self) -> None:
45         """ Close the underlying asynchronous connection.
46         """
47         if self.conn:
48             self.conn.close()
49             self.conn = None
50
51
52     def fetch_next_batch(self, cur: Cursor, runner: runners.Runner) -> bool:
53         """ Send a request for the next batch of places.
54             If details for the places are required, they will be fetched
55             asynchronously.
56
57             Returns true if there is still data available.
58         """
59         ids = cast(Optional[DictCursorResults], cur.fetchmany(100))
60
61         if not ids:
62             self.current_ids = None
63             return False
64
65         assert self.conn is not None
66         self.current_ids = runner.get_place_details(self.conn, ids)
67
68         return True
69
70     def get_batch(self) -> DictCursorResults:
71         """ Get the next batch of data, previously requested with
72             `fetch_next_batch`.
73         """
74         assert self.conn is not None
75         assert self.conn.cursor is not None
76
77         if self.current_ids is not None and not self.current_ids:
78             tstart = time.time()
79             self.conn.wait()
80             self.wait_time += time.time() - tstart
81             self.current_ids = cast(Optional[DictCursorResults],
82                                     self.conn.cursor.fetchall())
83
84         return self.current_ids if self.current_ids is not None else []
85
86     def __enter__(self) -> 'PlaceFetcher':
87         return self
88
89
90     def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
91         assert self.conn is not None
92         self.conn.wait()
93         self.close()
94
95
96 class Indexer:
97     """ Main indexing routine.
98     """
99
100     def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
101         self.dsn = dsn
102         self.tokenizer = tokenizer
103         self.num_threads = num_threads
104
105
106     def has_pending(self) -> bool:
107         """ Check if any data still needs indexing.
108             This function must only be used after the import has finished.
109             Otherwise it will be very expensive.
110         """
111         with connect(self.dsn) as conn:
112             with conn.cursor() as cur:
113                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
114                 return cur.rowcount > 0
115
116
117     def index_full(self, analyse: bool = True) -> None:
118         """ Index the complete database. This will first index boundaries
119             followed by all other objects. When `analyse` is True, then the
120             database will be analysed at the appropriate places to
121             ensure that database statistics are updated.
122         """
123         with connect(self.dsn) as conn:
124             conn.autocommit = True
125
126             def _analyze() -> None:
127                 if analyse:
128                     with conn.cursor() as cur:
129                         cur.execute('ANALYZE')
130
131             self.index_by_rank(0, 4)
132             _analyze()
133
134             self.index_boundaries(0, 30)
135             _analyze()
136
137             self.index_by_rank(5, 25)
138             _analyze()
139
140             self.index_by_rank(26, 30)
141             _analyze()
142
143             self.index_postcodes()
144             _analyze()
145
146
147     def index_boundaries(self, minrank: int, maxrank: int) -> None:
148         """ Index only administrative boundaries within the given rank range.
149         """
150         LOG.warning("Starting indexing boundaries using %s threads",
151                     self.num_threads)
152
153         with self.tokenizer.name_analyzer() as analyzer:
154             for rank in range(max(minrank, 4), min(maxrank, 26)):
155                 self._index(runners.BoundaryRunner(rank, analyzer))
156
157     def index_by_rank(self, minrank: int, maxrank: int) -> None:
158         """ Index all entries of placex in the given rank range (inclusive)
159             in order of their address rank.
160
161             When rank 30 is requested then also interpolations and
162             places with address rank 0 will be indexed.
163         """
164         maxrank = min(maxrank, 30)
165         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
166                     minrank, maxrank, self.num_threads)
167
168         with self.tokenizer.name_analyzer() as analyzer:
169             for rank in range(max(1, minrank), maxrank + 1):
170                 self._index(runners.RankRunner(rank, analyzer), 20 if rank == 30 else 1)
171
172             if maxrank == 30:
173                 self._index(runners.RankRunner(0, analyzer))
174                 self._index(runners.InterpolationRunner(analyzer), 20)
175
176
177     def index_postcodes(self) -> None:
178         """Index the entries ofthe location_postcode table.
179         """
180         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
181
182         self._index(runners.PostcodeRunner(), 20)
183
184
185     def update_status_table(self) -> None:
186         """ Update the status in the status table to 'indexed'.
187         """
188         with connect(self.dsn) as conn:
189             with conn.cursor() as cur:
190                 cur.execute('UPDATE import_status SET indexed = true')
191
192             conn.commit()
193
194     def _index(self, runner: runners.Runner, batch: int = 1) -> None:
195         """ Index a single rank or table. `runner` describes the SQL to use
196             for indexing. `batch` describes the number of objects that
197             should be processed with a single SQL statement
198         """
199         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
200
201         with connect(self.dsn) as conn:
202             psycopg2.extras.register_hstore(conn)
203             with conn.cursor() as cur:
204                 total_tuples = cur.scalar(runner.sql_count_objects())
205                 LOG.debug("Total number of rows: %i", total_tuples)
206
207             conn.commit()
208
209             progress = ProgressLogger(runner.name(), total_tuples)
210
211             if total_tuples > 0:
212                 with conn.cursor(name='places') as cur:
213                     cur.execute(runner.sql_get_objects())
214
215                     with PlaceFetcher(self.dsn, conn) as fetcher:
216                         with WorkerPool(self.dsn, self.num_threads) as pool:
217                             has_more = fetcher.fetch_next_batch(cur, runner)
218                             while has_more:
219                                 places = fetcher.get_batch()
220
221                                 # asynchronously get the next batch
222                                 has_more = fetcher.fetch_next_batch(cur, runner)
223
224                                 # And insert the curent batch
225                                 for idx in range(0, len(places), batch):
226                                     part = places[idx:idx + batch]
227                                     LOG.debug("Processing places: %s", str(part))
228                                     runner.index_places(pool.next_free_worker(), part)
229                                     progress.add(len(part))
230
231                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
232                                      fetcher.wait_time, pool.wait_time)
233
234                 conn.commit()
235
236         progress.done()