]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
cfa48433fd120c664ecd7aea3b432aec329a3261
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 import logging
5 import select
6
7 from nominatim.indexer.progress import ProgressLogger
8 from nominatim.indexer import runners
9 from nominatim.db.async_connection import DBConnection
10 from nominatim.db.connection import connect
11
12 LOG = logging.getLogger()
13
14 class WorkerPool:
15     """ A pool of asynchronous database connections.
16
17         The pool may be used as a context manager.
18     """
19     REOPEN_CONNECTIONS_AFTER = 100000
20
21     def __init__(self, dsn, pool_size):
22         self.threads = [DBConnection(dsn) for _ in range(pool_size)]
23         self.free_workers = self._yield_free_worker()
24
25
26     def finish_all(self):
27         """ Wait for all connection to finish.
28         """
29         for thread in self.threads:
30             while not thread.is_done():
31                 thread.wait()
32
33         self.free_workers = self._yield_free_worker()
34
35     def close(self):
36         """ Close all connections and clear the pool.
37         """
38         for thread in self.threads:
39             thread.close()
40         self.threads = []
41         self.free_workers = None
42
43
44     def next_free_worker(self):
45         """ Get the next free connection.
46         """
47         return next(self.free_workers)
48
49
50     def _yield_free_worker(self):
51         ready = self.threads
52         command_stat = 0
53         while True:
54             for thread in ready:
55                 if thread.is_done():
56                     command_stat += 1
57                     yield thread
58
59             if command_stat > self.REOPEN_CONNECTIONS_AFTER:
60                 for thread in self.threads:
61                     while not thread.is_done():
62                         thread.wait()
63                     thread.connect()
64                 ready = self.threads
65                 command_stat = 0
66             else:
67                 _, ready, _ = select.select([], self.threads, [])
68
69
70     def __enter__(self):
71         return self
72
73
74     def __exit__(self, exc_type, exc_value, traceback):
75         self.close()
76
77
78 class Indexer:
79     """ Main indexing routine.
80     """
81
82     def __init__(self, dsn, num_threads):
83         self.dsn = dsn
84         self.num_threads = num_threads
85
86
87     def index_full(self, analyse=True):
88         """ Index the complete database. This will first index boudnaries
89             followed by all other objects. When `analyse` is True, then the
90             database will be analysed at the appropriate places to
91             ensure that database statistics are updated.
92         """
93         with connect(self.dsn) as conn:
94             conn.autocommit = True
95
96             if analyse:
97                 def _analyze():
98                     with conn.cursor() as cur:
99                         cur.execute('ANALYZE')
100             else:
101                 def _analyze():
102                     pass
103
104             self.index_by_rank(0, 4)
105             _analyze()
106
107             self.index_boundaries(0, 30)
108             _analyze()
109
110             self.index_by_rank(5, 25)
111             _analyze()
112
113             self.index_by_rank(26, 30)
114             _analyze()
115
116             self.index_postcodes()
117             _analyze()
118
119
120     def index_boundaries(self, minrank, maxrank):
121         """ Index only administrative boundaries within the given rank range.
122         """
123         LOG.warning("Starting indexing boundaries using %s threads",
124                     self.num_threads)
125
126         for rank in range(max(minrank, 4), min(maxrank, 26)):
127             self._index(runners.BoundaryRunner(rank))
128
129     def index_by_rank(self, minrank, maxrank):
130         """ Index all entries of placex in the given rank range (inclusive)
131             in order of their address rank.
132
133             When rank 30 is requested then also interpolations and
134             places with address rank 0 will be indexed.
135         """
136         maxrank = min(maxrank, 30)
137         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
138                     minrank, maxrank, self.num_threads)
139
140         for rank in range(max(1, minrank), maxrank):
141             self._index(runners.RankRunner(rank))
142
143         if maxrank == 30:
144             self._index(runners.RankRunner(0))
145             self._index(runners.InterpolationRunner(), 20)
146             self._index(runners.RankRunner(30), 20)
147         else:
148             self._index(runners.RankRunner(maxrank))
149
150
151     def index_postcodes(self):
152         """Index the entries ofthe location_postcode table.
153         """
154         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
155
156         self._index(runners.PostcodeRunner(), 20)
157
158
159     def update_status_table(self):
160         """ Update the status in the status table to 'indexed'.
161         """
162         with connect(self.dsn) as conn:
163             with conn.cursor() as cur:
164                 cur.execute('UPDATE import_status SET indexed = true')
165
166             conn.commit()
167
168     def _index(self, runner, batch=1):
169         """ Index a single rank or table. `runner` describes the SQL to use
170             for indexing. `batch` describes the number of objects that
171             should be processed with a single SQL statement
172         """
173         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
174
175         with connect(self.dsn) as conn:
176             with conn.cursor() as cur:
177                 total_tuples = cur.scalar(runner.sql_count_objects())
178                 LOG.debug("Total number of rows: %i", total_tuples)
179
180             conn.commit()
181
182             progress = ProgressLogger(runner.name(), total_tuples)
183
184             if total_tuples > 0:
185                 with conn.cursor(name='places') as cur:
186                     cur.execute(runner.sql_get_objects())
187
188                     with WorkerPool(self.dsn, self.num_threads) as pool:
189                         while True:
190                             places = [p for p in cur.fetchmany(batch)]
191                             if not places:
192                                 break
193
194                             LOG.debug("Processing places: %s", str(places))
195                             worker = pool.next_free_worker()
196
197                             runner.index_places(worker, places)
198                             progress.add(len(places))
199
200                         pool.finish_all()
201
202                 conn.commit()
203
204         progress.done()