]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
Merge pull request #2393 from lonvia/fix-flake8-issues
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 import logging
5 import time
6
7 import psycopg2.extras
8
9 from nominatim.indexer.progress import ProgressLogger
10 from nominatim.indexer import runners
11 from nominatim.db.async_connection import DBConnection, WorkerPool
12 from nominatim.db.connection import connect
13
14 LOG = logging.getLogger()
15
16
17 class PlaceFetcher:
18     """ Asynchronous connection that fetches place details for processing.
19     """
20     def __init__(self, dsn, setup_conn):
21         self.wait_time = 0
22         self.current_ids = None
23         self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
24
25         with setup_conn.cursor() as cur:
26             # need to fetch those manually because register_hstore cannot
27             # fetch them on an asynchronous connection below.
28             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
29             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
30
31         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
32                                         array_oid=hstore_array_oid)
33
34     def close(self):
35         """ Close the underlying asynchronous connection.
36         """
37         if self.conn:
38             self.conn.close()
39             self.conn = None
40
41
42     def fetch_next_batch(self, cur, runner):
43         """ Send a request for the next batch of places.
44             If details for the places are required, they will be fetched
45             asynchronously.
46
47             Returns true if there is still data available.
48         """
49         ids = cur.fetchmany(100)
50
51         if not ids:
52             self.current_ids = None
53             return False
54
55         if hasattr(runner, 'get_place_details'):
56             runner.get_place_details(self.conn, ids)
57             self.current_ids = []
58         else:
59             self.current_ids = ids
60
61         return True
62
63     def get_batch(self):
64         """ Get the next batch of data, previously requested with
65             `fetch_next_batch`.
66         """
67         if self.current_ids is not None and not self.current_ids:
68             tstart = time.time()
69             self.conn.wait()
70             self.wait_time += time.time() - tstart
71             self.current_ids = self.conn.cursor.fetchall()
72
73         return self.current_ids
74
75     def __enter__(self):
76         return self
77
78
79     def __exit__(self, exc_type, exc_value, traceback):
80         self.conn.wait()
81         self.close()
82
83
84 class Indexer:
85     """ Main indexing routine.
86     """
87
88     def __init__(self, dsn, tokenizer, num_threads):
89         self.dsn = dsn
90         self.tokenizer = tokenizer
91         self.num_threads = num_threads
92
93
94     def index_full(self, analyse=True):
95         """ Index the complete database. This will first index boundaries
96             followed by all other objects. When `analyse` is True, then the
97             database will be analysed at the appropriate places to
98             ensure that database statistics are updated.
99         """
100         with connect(self.dsn) as conn:
101             conn.autocommit = True
102
103             def _analyze():
104                 if analyse:
105                     with conn.cursor() as cur:
106                         cur.execute('ANALYZE')
107
108             self.index_by_rank(0, 4)
109             _analyze()
110
111             self.index_boundaries(0, 30)
112             _analyze()
113
114             self.index_by_rank(5, 25)
115             _analyze()
116
117             self.index_by_rank(26, 30)
118             _analyze()
119
120             self.index_postcodes()
121             _analyze()
122
123
124     def index_boundaries(self, minrank, maxrank):
125         """ Index only administrative boundaries within the given rank range.
126         """
127         LOG.warning("Starting indexing boundaries using %s threads",
128                     self.num_threads)
129
130         with self.tokenizer.name_analyzer() as analyzer:
131             for rank in range(max(minrank, 4), min(maxrank, 26)):
132                 self._index(runners.BoundaryRunner(rank, analyzer))
133
134     def index_by_rank(self, minrank, maxrank):
135         """ Index all entries of placex in the given rank range (inclusive)
136             in order of their address rank.
137
138             When rank 30 is requested then also interpolations and
139             places with address rank 0 will be indexed.
140         """
141         maxrank = min(maxrank, 30)
142         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
143                     minrank, maxrank, self.num_threads)
144
145         with self.tokenizer.name_analyzer() as analyzer:
146             for rank in range(max(1, minrank), maxrank):
147                 self._index(runners.RankRunner(rank, analyzer))
148
149             if maxrank == 30:
150                 self._index(runners.RankRunner(0, analyzer))
151                 self._index(runners.InterpolationRunner(analyzer), 20)
152                 self._index(runners.RankRunner(30, analyzer), 20)
153             else:
154                 self._index(runners.RankRunner(maxrank, analyzer))
155
156
157     def index_postcodes(self):
158         """Index the entries ofthe location_postcode table.
159         """
160         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
161
162         self._index(runners.PostcodeRunner(), 20)
163
164
165     def update_status_table(self):
166         """ Update the status in the status table to 'indexed'.
167         """
168         with connect(self.dsn) as conn:
169             with conn.cursor() as cur:
170                 cur.execute('UPDATE import_status SET indexed = true')
171
172             conn.commit()
173
174     def _index(self, runner, batch=1):
175         """ Index a single rank or table. `runner` describes the SQL to use
176             for indexing. `batch` describes the number of objects that
177             should be processed with a single SQL statement
178         """
179         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
180
181         with connect(self.dsn) as conn:
182             psycopg2.extras.register_hstore(conn)
183             with conn.cursor() as cur:
184                 total_tuples = cur.scalar(runner.sql_count_objects())
185                 LOG.debug("Total number of rows: %i", total_tuples)
186
187             conn.commit()
188
189             progress = ProgressLogger(runner.name(), total_tuples)
190
191             if total_tuples > 0:
192                 with conn.cursor(name='places') as cur:
193                     cur.execute(runner.sql_get_objects())
194
195                     with PlaceFetcher(self.dsn, conn) as fetcher:
196                         with WorkerPool(self.dsn, self.num_threads) as pool:
197                             has_more = fetcher.fetch_next_batch(cur, runner)
198                             while has_more:
199                                 places = fetcher.get_batch()
200
201                                 # asynchronously get the next batch
202                                 has_more = fetcher.fetch_next_batch(cur, runner)
203
204                                 # And insert the curent batch
205                                 for idx in range(0, len(places), batch):
206                                     part = places[idx:idx + batch]
207                                     LOG.debug("Processing places: %s", str(part))
208                                     runner.index_places(pool.next_free_worker(), part)
209                                     progress.add(len(part))
210
211                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
212                                      fetcher.wait_time, pool.wait_time)
213
214                 conn.commit()
215
216         progress.done()