2 Main work horse for indexing (computing addresses) the database.
7 from nominatim.indexer.progress import ProgressLogger
8 from nominatim.indexer import runners
9 from nominatim.db.async_connection import DBConnection
10 from nominatim.db.connection import connect
12 LOG = logging.getLogger()
15 """ A pool of asynchronous database connections.
17 The pool may be used as a context manager.
19 REOPEN_CONNECTIONS_AFTER = 100000
21 def __init__(self, dsn, pool_size):
22 self.threads = [DBConnection(dsn) for _ in range(pool_size)]
23 self.free_workers = self._yield_free_worker()
27 """ Wait for all connection to finish.
29 for thread in self.threads:
30 while not thread.is_done():
33 self.free_workers = self._yield_free_worker()
36 """ Close all connections and clear the pool.
38 for thread in self.threads:
41 self.free_workers = None
44 def next_free_worker(self):
45 """ Get the next free connection.
47 return next(self.free_workers)
50 def _yield_free_worker(self):
59 if command_stat > self.REOPEN_CONNECTIONS_AFTER:
60 for thread in self.threads:
61 while not thread.is_done():
67 _, ready, _ = select.select([], self.threads, [])
74 def __exit__(self, exc_type, exc_value, traceback):
79 """ Main indexing routine.
82 def __init__(self, dsn, num_threads):
84 self.num_threads = num_threads
87 def index_full(self, analyse=True):
88 """ Index the complete database. This will first index boudnaries
89 followed by all other objects. When `analyse` is True, then the
90 database will be analysed at the appropriate places to
91 ensure that database statistics are updated.
93 with connect(self.dsn) as conn:
94 conn.autocommit = True
98 with conn.cursor() as cur:
99 cur.execute('ANALYZE')
104 self.index_by_rank(0, 4)
107 self.index_boundaries(0, 30)
110 self.index_by_rank(5, 25)
113 self.index_by_rank(26, 30)
116 self.index_postcodes()
120 def index_boundaries(self, minrank, maxrank):
121 """ Index only administrative boundaries within the given rank range.
123 LOG.warning("Starting indexing boundaries using %s threads",
126 for rank in range(max(minrank, 4), min(maxrank, 26)):
127 self._index(runners.BoundaryRunner(rank))
129 def index_by_rank(self, minrank, maxrank):
130 """ Index all entries of placex in the given rank range (inclusive)
131 in order of their address rank.
133 When rank 30 is requested then also interpolations and
134 places with address rank 0 will be indexed.
136 maxrank = min(maxrank, 30)
137 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
138 minrank, maxrank, self.num_threads)
140 for rank in range(max(1, minrank), maxrank):
141 self._index(runners.RankRunner(rank))
144 self._index(runners.RankRunner(0))
145 self._index(runners.InterpolationRunner(), 20)
146 self._index(runners.RankRunner(30), 20)
148 self._index(runners.RankRunner(maxrank))
151 def index_postcodes(self):
152 """Index the entries ofthe location_postcode table.
154 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
156 self._index(runners.PostcodeRunner(), 20)
159 def update_status_table(self):
160 """ Update the status in the status table to 'indexed'.
162 with connect(self.dsn) as conn:
163 with conn.cursor() as cur:
164 cur.execute('UPDATE import_status SET indexed = true')
168 def _index(self, runner, batch=1):
169 """ Index a single rank or table. `runner` describes the SQL to use
170 for indexing. `batch` describes the number of objects that
171 should be processed with a single SQL statement
173 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
175 with connect(self.dsn) as conn:
176 with conn.cursor() as cur:
177 total_tuples = cur.scalar(runner.sql_count_objects())
178 LOG.debug("Total number of rows: %i", total_tuples)
182 progress = ProgressLogger(runner.name(), total_tuples)
185 with conn.cursor(name='places') as cur:
186 cur.execute(runner.sql_get_objects())
188 with WorkerPool(self.dsn, self.num_threads) as pool:
190 places = [p[0] for p in cur.fetchmany(batch)]
194 LOG.debug("Processing places: %s", str(places))
195 worker = pool.next_free_worker()
197 worker.perform(runner.sql_index_place(places))
198 progress.add(len(places))