]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 import logging
5 import select
6 import time
7
8 import psycopg2.extras
9
10 from nominatim.indexer.progress import ProgressLogger
11 from nominatim.indexer import runners
12 from nominatim.db.async_connection import DBConnection
13 from nominatim.db.connection import connect
14
15 LOG = logging.getLogger()
16
17
18 class PlaceFetcher:
19     """ Asynchronous connection that fetches place details for processing.
20     """
21     def __init__(self, dsn, setup_conn):
22         self.wait_time = 0
23         self.current_ids = None
24         self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
25
26         with setup_conn.cursor() as cur:
27             # need to fetch those manually because register_hstore cannot
28             # fetch them on an asynchronous connection below.
29             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
30             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
31
32         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
33                                         array_oid=hstore_array_oid)
34
35     def close(self):
36         """ Close the underlying asynchronous connection.
37         """
38         if self.conn:
39             self.conn.close()
40             self.conn = None
41
42
43     def fetch_next_batch(self, cur, runner):
44         """ Send a request for the next batch of places.
45             If details for the places are required, they will be fetched
46             asynchronously.
47
48             Returns true if there is still data available.
49         """
50         ids = cur.fetchmany(100)
51
52         if not ids:
53             self.current_ids = None
54             return False
55
56         if hasattr(runner, 'get_place_details'):
57             runner.get_place_details(self.conn, ids)
58             self.current_ids = []
59         else:
60             self.current_ids = ids
61
62         return True
63
64     def get_batch(self):
65         """ Get the next batch of data, previously requested with
66             `fetch_next_batch`.
67         """
68         if self.current_ids is not None and not self.current_ids:
69             tstart = time.time()
70             self.conn.wait()
71             self.wait_time += time.time() - tstart
72             self.current_ids = self.conn.cursor.fetchall()
73
74         return self.current_ids
75
76     def __enter__(self):
77         return self
78
79
80     def __exit__(self, exc_type, exc_value, traceback):
81         self.conn.wait()
82         self.close()
83
84 class WorkerPool:
85     """ A pool of asynchronous database connections.
86
87         The pool may be used as a context manager.
88     """
89     REOPEN_CONNECTIONS_AFTER = 100000
90
91     def __init__(self, dsn, pool_size):
92         self.threads = [DBConnection(dsn) for _ in range(pool_size)]
93         self.free_workers = self._yield_free_worker()
94         self.wait_time = 0
95
96
97     def finish_all(self):
98         """ Wait for all connection to finish.
99         """
100         for thread in self.threads:
101             while not thread.is_done():
102                 thread.wait()
103
104         self.free_workers = self._yield_free_worker()
105
106     def close(self):
107         """ Close all connections and clear the pool.
108         """
109         for thread in self.threads:
110             thread.close()
111         self.threads = []
112         self.free_workers = None
113
114
115     def next_free_worker(self):
116         """ Get the next free connection.
117         """
118         return next(self.free_workers)
119
120
121     def _yield_free_worker(self):
122         ready = self.threads
123         command_stat = 0
124         while True:
125             for thread in ready:
126                 if thread.is_done():
127                     command_stat += 1
128                     yield thread
129
130             if command_stat > self.REOPEN_CONNECTIONS_AFTER:
131                 for thread in self.threads:
132                     while not thread.is_done():
133                         thread.wait()
134                     thread.connect()
135                 ready = self.threads
136                 command_stat = 0
137             else:
138                 tstart = time.time()
139                 _, ready, _ = select.select([], self.threads, [])
140                 self.wait_time += time.time() - tstart
141
142
143     def __enter__(self):
144         return self
145
146
147     def __exit__(self, exc_type, exc_value, traceback):
148         self.finish_all()
149         self.close()
150
151
152 class Indexer:
153     """ Main indexing routine.
154     """
155
156     def __init__(self, dsn, tokenizer, num_threads):
157         self.dsn = dsn
158         self.tokenizer = tokenizer
159         self.num_threads = num_threads
160
161
162     def index_full(self, analyse=True):
163         """ Index the complete database. This will first index boudnaries
164             followed by all other objects. When `analyse` is True, then the
165             database will be analysed at the appropriate places to
166             ensure that database statistics are updated.
167         """
168         with connect(self.dsn) as conn:
169             conn.autocommit = True
170
171             if analyse:
172                 def _analyze():
173                     with conn.cursor() as cur:
174                         cur.execute('ANALYZE')
175             else:
176                 def _analyze():
177                     pass
178
179             self.index_by_rank(0, 4)
180             _analyze()
181
182             self.index_boundaries(0, 30)
183             _analyze()
184
185             self.index_by_rank(5, 25)
186             _analyze()
187
188             self.index_by_rank(26, 30)
189             _analyze()
190
191             self.index_postcodes()
192             _analyze()
193
194
195     def index_boundaries(self, minrank, maxrank):
196         """ Index only administrative boundaries within the given rank range.
197         """
198         LOG.warning("Starting indexing boundaries using %s threads",
199                     self.num_threads)
200
201         with self.tokenizer.name_analyzer() as analyzer:
202             for rank in range(max(minrank, 4), min(maxrank, 26)):
203                 self._index(runners.BoundaryRunner(rank, analyzer))
204
205     def index_by_rank(self, minrank, maxrank):
206         """ Index all entries of placex in the given rank range (inclusive)
207             in order of their address rank.
208
209             When rank 30 is requested then also interpolations and
210             places with address rank 0 will be indexed.
211         """
212         maxrank = min(maxrank, 30)
213         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
214                     minrank, maxrank, self.num_threads)
215
216         with self.tokenizer.name_analyzer() as analyzer:
217             for rank in range(max(1, minrank), maxrank):
218                 self._index(runners.RankRunner(rank, analyzer))
219
220             if maxrank == 30:
221                 self._index(runners.RankRunner(0, analyzer))
222                 self._index(runners.InterpolationRunner(analyzer), 20)
223                 self._index(runners.RankRunner(30, analyzer), 20)
224             else:
225                 self._index(runners.RankRunner(maxrank, analyzer))
226
227
228     def index_postcodes(self):
229         """Index the entries ofthe location_postcode table.
230         """
231         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
232
233         self._index(runners.PostcodeRunner(), 20)
234
235
236     def update_status_table(self):
237         """ Update the status in the status table to 'indexed'.
238         """
239         with connect(self.dsn) as conn:
240             with conn.cursor() as cur:
241                 cur.execute('UPDATE import_status SET indexed = true')
242
243             conn.commit()
244
245     def _index(self, runner, batch=1):
246         """ Index a single rank or table. `runner` describes the SQL to use
247             for indexing. `batch` describes the number of objects that
248             should be processed with a single SQL statement
249         """
250         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
251
252         with connect(self.dsn) as conn:
253             psycopg2.extras.register_hstore(conn)
254             with conn.cursor() as cur:
255                 total_tuples = cur.scalar(runner.sql_count_objects())
256                 LOG.debug("Total number of rows: %i", total_tuples)
257
258             conn.commit()
259
260             progress = ProgressLogger(runner.name(), total_tuples)
261
262             if total_tuples > 0:
263                 with conn.cursor(name='places') as cur:
264                     cur.execute(runner.sql_get_objects())
265
266                     with PlaceFetcher(self.dsn, conn) as fetcher:
267                         with WorkerPool(self.dsn, self.num_threads) as pool:
268                             has_more = fetcher.fetch_next_batch(cur, runner)
269                             while has_more:
270                                 places = fetcher.get_batch()
271
272                                 # asynchronously get the next batch
273                                 has_more = fetcher.fetch_next_batch(cur, runner)
274
275                                 # And insert the curent batch
276                                 for idx in range(0, len(places), batch):
277                                     part = places[idx:idx+batch]
278                                     LOG.debug("Processing places: %s", str(part))
279                                     runner.index_places(pool.next_free_worker(), part)
280                                     progress.add(len(part))
281
282                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
283                                      fetcher.wait_time, pool.wait_time)
284
285                 conn.commit()
286
287         progress.done()