]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / nominatim / indexer / indexer.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Main work horse for indexing (computing addresses) the database.
9 """
10 import logging
11 import time
12
13 import psycopg2.extras
14
15 from nominatim.indexer.progress import ProgressLogger
16 from nominatim.indexer import runners
17 from nominatim.db.async_connection import DBConnection, WorkerPool
18 from nominatim.db.connection import connect
19
20 LOG = logging.getLogger()
21
22
23 class PlaceFetcher:
24     """ Asynchronous connection that fetches place details for processing.
25     """
26     def __init__(self, dsn, setup_conn):
27         self.wait_time = 0
28         self.current_ids = None
29         self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
30
31         with setup_conn.cursor() as cur:
32             # need to fetch those manually because register_hstore cannot
33             # fetch them on an asynchronous connection below.
34             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
35             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
36
37         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
38                                         array_oid=hstore_array_oid)
39
40     def close(self):
41         """ Close the underlying asynchronous connection.
42         """
43         if self.conn:
44             self.conn.close()
45             self.conn = None
46
47
48     def fetch_next_batch(self, cur, runner):
49         """ Send a request for the next batch of places.
50             If details for the places are required, they will be fetched
51             asynchronously.
52
53             Returns true if there is still data available.
54         """
55         ids = cur.fetchmany(100)
56
57         if not ids:
58             self.current_ids = None
59             return False
60
61         if hasattr(runner, 'get_place_details'):
62             runner.get_place_details(self.conn, ids)
63             self.current_ids = []
64         else:
65             self.current_ids = ids
66
67         return True
68
69     def get_batch(self):
70         """ Get the next batch of data, previously requested with
71             `fetch_next_batch`.
72         """
73         if self.current_ids is not None and not self.current_ids:
74             tstart = time.time()
75             self.conn.wait()
76             self.wait_time += time.time() - tstart
77             self.current_ids = self.conn.cursor.fetchall()
78
79         return self.current_ids
80
81     def __enter__(self):
82         return self
83
84
85     def __exit__(self, exc_type, exc_value, traceback):
86         self.conn.wait()
87         self.close()
88
89
90 class Indexer:
91     """ Main indexing routine.
92     """
93
94     def __init__(self, dsn, tokenizer, num_threads):
95         self.dsn = dsn
96         self.tokenizer = tokenizer
97         self.num_threads = num_threads
98
99
100     def has_pending(self):
101         """ Check if any data still needs indexing.
102             This function must only be used after the import has finished.
103             Otherwise it will be very expensive.
104         """
105         with connect(self.dsn) as conn:
106             with conn.cursor() as cur:
107                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
108                 return cur.rowcount > 0
109
110
111     def index_full(self, analyse=True):
112         """ Index the complete database. This will first index boundaries
113             followed by all other objects. When `analyse` is True, then the
114             database will be analysed at the appropriate places to
115             ensure that database statistics are updated.
116         """
117         with connect(self.dsn) as conn:
118             conn.autocommit = True
119
120             def _analyze():
121                 if analyse:
122                     with conn.cursor() as cur:
123                         cur.execute('ANALYZE')
124
125             self.index_by_rank(0, 4)
126             _analyze()
127
128             self.index_boundaries(0, 30)
129             _analyze()
130
131             self.index_by_rank(5, 25)
132             _analyze()
133
134             self.index_by_rank(26, 30)
135             _analyze()
136
137             self.index_postcodes()
138             _analyze()
139
140
141     def index_boundaries(self, minrank, maxrank):
142         """ Index only administrative boundaries within the given rank range.
143         """
144         LOG.warning("Starting indexing boundaries using %s threads",
145                     self.num_threads)
146
147         with self.tokenizer.name_analyzer() as analyzer:
148             for rank in range(max(minrank, 4), min(maxrank, 26)):
149                 self._index(runners.BoundaryRunner(rank, analyzer))
150
151     def index_by_rank(self, minrank, maxrank):
152         """ Index all entries of placex in the given rank range (inclusive)
153             in order of their address rank.
154
155             When rank 30 is requested then also interpolations and
156             places with address rank 0 will be indexed.
157         """
158         maxrank = min(maxrank, 30)
159         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
160                     minrank, maxrank, self.num_threads)
161
162         with self.tokenizer.name_analyzer() as analyzer:
163             for rank in range(max(1, minrank), maxrank + 1):
164                 self._index(runners.RankRunner(rank, analyzer), 20 if rank == 30 else 1)
165
166             if maxrank == 30:
167                 self._index(runners.RankRunner(0, analyzer))
168                 self._index(runners.InterpolationRunner(analyzer), 20)
169
170
171     def index_postcodes(self):
172         """Index the entries ofthe location_postcode table.
173         """
174         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
175
176         self._index(runners.PostcodeRunner(), 20)
177
178
179     def update_status_table(self):
180         """ Update the status in the status table to 'indexed'.
181         """
182         with connect(self.dsn) as conn:
183             with conn.cursor() as cur:
184                 cur.execute('UPDATE import_status SET indexed = true')
185
186             conn.commit()
187
188     def _index(self, runner, batch=1):
189         """ Index a single rank or table. `runner` describes the SQL to use
190             for indexing. `batch` describes the number of objects that
191             should be processed with a single SQL statement
192         """
193         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
194
195         with connect(self.dsn) as conn:
196             psycopg2.extras.register_hstore(conn)
197             with conn.cursor() as cur:
198                 total_tuples = cur.scalar(runner.sql_count_objects())
199                 LOG.debug("Total number of rows: %i", total_tuples)
200
201             conn.commit()
202
203             progress = ProgressLogger(runner.name(), total_tuples)
204
205             if total_tuples > 0:
206                 with conn.cursor(name='places') as cur:
207                     cur.execute(runner.sql_get_objects())
208
209                     with PlaceFetcher(self.dsn, conn) as fetcher:
210                         with WorkerPool(self.dsn, self.num_threads) as pool:
211                             has_more = fetcher.fetch_next_batch(cur, runner)
212                             while has_more:
213                                 places = fetcher.get_batch()
214
215                                 # asynchronously get the next batch
216                                 has_more = fetcher.fetch_next_batch(cur, runner)
217
218                                 # And insert the curent batch
219                                 for idx in range(0, len(places), batch):
220                                     part = places[idx:idx + batch]
221                                     LOG.debug("Processing places: %s", str(part))
222                                     runner.index_places(pool.next_free_worker(), part)
223                                     progress.add(len(part))
224
225                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
226                                      fetcher.wait_time, pool.wait_time)
227
228                 conn.commit()
229
230         progress.done()