2 Main work horse for indexing (computing addresses) the database.
10 from nominatim.indexer.progress import ProgressLogger
11 from nominatim.indexer import runners
12 from nominatim.db.async_connection import DBConnection
13 from nominatim.db.connection import connect
15 LOG = logging.getLogger()
19 """ Asynchronous connection that fetches place details for processing.
21 def __init__(self, dsn, setup_conn):
23 self.current_ids = None
24 self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
26 with setup_conn.cursor() as cur:
27 # need to fetch those manually because register_hstore cannot
28 # fetch them on an asynchronous connection below.
29 hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
30 hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
32 psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
33 array_oid=hstore_array_oid)
36 """ Close the underlying asynchronous connection.
43 def fetch_next_batch(self, cur, runner):
44 """ Send a request for the next batch of places.
45 If details for the places are required, they will be fetched
48 Returns true if there is still data available.
50 ids = cur.fetchmany(100)
53 self.current_ids = None
56 if hasattr(runner, 'get_place_details'):
57 runner.get_place_details(self.conn, ids)
60 self.current_ids = ids
65 """ Get the next batch of data, previously requested with
68 if self.current_ids is not None and not self.current_ids:
71 self.wait_time += time.time() - tstart
72 self.current_ids = self.conn.cursor.fetchall()
74 return self.current_ids
80 def __exit__(self, exc_type, exc_value, traceback):
85 """ A pool of asynchronous database connections.
87 The pool may be used as a context manager.
89 REOPEN_CONNECTIONS_AFTER = 100000
91 def __init__(self, dsn, pool_size):
92 self.threads = [DBConnection(dsn) for _ in range(pool_size)]
93 self.free_workers = self._yield_free_worker()
98 """ Wait for all connection to finish.
100 for thread in self.threads:
101 while not thread.is_done():
104 self.free_workers = self._yield_free_worker()
107 """ Close all connections and clear the pool.
109 for thread in self.threads:
112 self.free_workers = None
115 def next_free_worker(self):
116 """ Get the next free connection.
118 return next(self.free_workers)
121 def _yield_free_worker(self):
130 if command_stat > self.REOPEN_CONNECTIONS_AFTER:
131 for thread in self.threads:
132 while not thread.is_done():
139 _, ready, _ = select.select([], self.threads, [])
140 self.wait_time += time.time() - tstart
147 def __exit__(self, exc_type, exc_value, traceback):
153 """ Main indexing routine.
156 def __init__(self, dsn, tokenizer, num_threads):
158 self.tokenizer = tokenizer
159 self.num_threads = num_threads
162 def index_full(self, analyse=True):
163 """ Index the complete database. This will first index boudnaries
164 followed by all other objects. When `analyse` is True, then the
165 database will be analysed at the appropriate places to
166 ensure that database statistics are updated.
168 with connect(self.dsn) as conn:
169 conn.autocommit = True
173 with conn.cursor() as cur:
174 cur.execute('ANALYZE')
179 self.index_by_rank(0, 4)
182 self.index_boundaries(0, 30)
185 self.index_by_rank(5, 25)
188 self.index_by_rank(26, 30)
191 self.index_postcodes()
195 def index_boundaries(self, minrank, maxrank):
196 """ Index only administrative boundaries within the given rank range.
198 LOG.warning("Starting indexing boundaries using %s threads",
201 with self.tokenizer.name_analyzer() as analyzer:
202 for rank in range(max(minrank, 4), min(maxrank, 26)):
203 self._index(runners.BoundaryRunner(rank, analyzer))
205 def index_by_rank(self, minrank, maxrank):
206 """ Index all entries of placex in the given rank range (inclusive)
207 in order of their address rank.
209 When rank 30 is requested then also interpolations and
210 places with address rank 0 will be indexed.
212 maxrank = min(maxrank, 30)
213 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
214 minrank, maxrank, self.num_threads)
216 with self.tokenizer.name_analyzer() as analyzer:
217 for rank in range(max(1, minrank), maxrank):
218 self._index(runners.RankRunner(rank, analyzer))
221 self._index(runners.RankRunner(0, analyzer))
222 self._index(runners.InterpolationRunner(analyzer), 20)
223 self._index(runners.RankRunner(30, analyzer), 20)
225 self._index(runners.RankRunner(maxrank, analyzer))
228 def index_postcodes(self):
229 """Index the entries ofthe location_postcode table.
231 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
233 self._index(runners.PostcodeRunner(), 20)
236 def update_status_table(self):
237 """ Update the status in the status table to 'indexed'.
239 with connect(self.dsn) as conn:
240 with conn.cursor() as cur:
241 cur.execute('UPDATE import_status SET indexed = true')
245 def _index(self, runner, batch=1):
246 """ Index a single rank or table. `runner` describes the SQL to use
247 for indexing. `batch` describes the number of objects that
248 should be processed with a single SQL statement
250 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
252 with connect(self.dsn) as conn:
253 psycopg2.extras.register_hstore(conn)
254 with conn.cursor() as cur:
255 total_tuples = cur.scalar(runner.sql_count_objects())
256 LOG.debug("Total number of rows: %i", total_tuples)
260 progress = ProgressLogger(runner.name(), total_tuples)
263 with conn.cursor(name='places') as cur:
264 cur.execute(runner.sql_get_objects())
266 with PlaceFetcher(self.dsn, conn) as fetcher:
267 with WorkerPool(self.dsn, self.num_threads) as pool:
268 has_more = fetcher.fetch_next_batch(cur, runner)
270 places = fetcher.get_batch()
272 # asynchronously get the next batch
273 has_more = fetcher.fetch_next_batch(cur, runner)
275 # And insert the curent batch
276 for idx in range(0, len(places), batch):
277 part = places[idx:idx+batch]
278 LOG.debug("Processing places: %s", str(part))
279 runner.index_places(pool.next_free_worker(), part)
280 progress.add(len(part))
282 LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
283 fetcher.wait_time, pool.wait_time)