2 Main work horse for indexing (computing addresses) the database.
7 from nominatim.indexer.progress import ProgressLogger
8 from nominatim.indexer import runners
9 from nominatim.db.async_connection import DBConnection
10 from nominatim.db.connection import connect
12 LOG = logging.getLogger()
15 """ A pool of asynchronous database connections.
17 The pool may be used as a context manager.
19 REOPEN_CONNECTIONS_AFTER = 100000
21 def __init__(self, dsn, pool_size):
22 self.threads = [DBConnection(dsn) for _ in range(pool_size)]
23 self.free_workers = self._yield_free_worker()
27 """ Wait for all connection to finish.
29 for thread in self.threads:
30 while not thread.is_done():
33 self.free_workers = self._yield_free_worker()
36 """ Close all connections and clear the pool.
38 for thread in self.threads:
41 self.free_workers = None
44 def next_free_worker(self):
45 """ Get the next free connection.
47 return next(self.free_workers)
50 def _yield_free_worker(self):
59 if command_stat > self.REOPEN_CONNECTIONS_AFTER:
60 for thread in self.threads:
61 while not thread.is_done():
67 _, ready, _ = select.select([], self.threads, [])
74 def __exit__(self, exc_type, exc_value, traceback):
79 """ Main indexing routine.
82 def __init__(self, dsn, tokenizer, num_threads):
84 self.tokenizer = tokenizer
85 self.num_threads = num_threads
88 def index_full(self, analyse=True):
89 """ Index the complete database. This will first index boudnaries
90 followed by all other objects. When `analyse` is True, then the
91 database will be analysed at the appropriate places to
92 ensure that database statistics are updated.
94 with connect(self.dsn) as conn:
95 conn.autocommit = True
99 with conn.cursor() as cur:
100 cur.execute('ANALYZE')
105 self.index_by_rank(0, 4)
108 self.index_boundaries(0, 30)
111 self.index_by_rank(5, 25)
114 self.index_by_rank(26, 30)
117 self.index_postcodes()
121 def index_boundaries(self, minrank, maxrank):
122 """ Index only administrative boundaries within the given rank range.
124 LOG.warning("Starting indexing boundaries using %s threads",
127 with self.tokenizer.name_analyzer() as analyzer:
128 for rank in range(max(minrank, 4), min(maxrank, 26)):
129 self._index(runners.BoundaryRunner(rank, analyzer))
131 def index_by_rank(self, minrank, maxrank):
132 """ Index all entries of placex in the given rank range (inclusive)
133 in order of their address rank.
135 When rank 30 is requested then also interpolations and
136 places with address rank 0 will be indexed.
138 maxrank = min(maxrank, 30)
139 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
140 minrank, maxrank, self.num_threads)
142 with self.tokenizer.name_analyzer() as analyzer:
143 for rank in range(max(1, minrank), maxrank):
144 self._index(runners.RankRunner(rank, analyzer))
147 self._index(runners.RankRunner(0, analyzer))
148 self._index(runners.InterpolationRunner(), 20)
149 self._index(runners.RankRunner(30, analyzer), 20)
151 self._index(runners.RankRunner(maxrank, analyzer))
154 def index_postcodes(self):
155 """Index the entries ofthe location_postcode table.
157 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
159 self._index(runners.PostcodeRunner(), 20)
162 def update_status_table(self):
163 """ Update the status in the status table to 'indexed'.
165 with connect(self.dsn) as conn:
166 with conn.cursor() as cur:
167 cur.execute('UPDATE import_status SET indexed = true')
171 def _index(self, runner, batch=1):
172 """ Index a single rank or table. `runner` describes the SQL to use
173 for indexing. `batch` describes the number of objects that
174 should be processed with a single SQL statement
176 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
178 with connect(self.dsn) as conn:
179 with conn.cursor() as cur:
180 total_tuples = cur.scalar(runner.sql_count_objects())
181 LOG.debug("Total number of rows: %i", total_tuples)
185 progress = ProgressLogger(runner.name(), total_tuples)
188 with conn.cursor(name='places') as cur:
189 cur.execute(runner.sql_get_objects())
191 with WorkerPool(self.dsn, self.num_threads) as pool:
193 places = cur.fetchmany(batch)
197 LOG.debug("Processing places: %s", str(places))
198 worker = pool.next_free_worker()
200 runner.index_places(worker, places)
201 progress.add(len(places))