1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Main work horse for indexing (computing addresses) the database.
13 import psycopg2.extras
15 from nominatim.indexer.progress import ProgressLogger
16 from nominatim.indexer import runners
17 from nominatim.db.async_connection import DBConnection, WorkerPool
18 from nominatim.db.connection import connect
20 LOG = logging.getLogger()
24 """ Asynchronous connection that fetches place details for processing.
26 def __init__(self, dsn, setup_conn):
28 self.current_ids = None
29 self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
31 with setup_conn.cursor() as cur:
32 # need to fetch those manually because register_hstore cannot
33 # fetch them on an asynchronous connection below.
34 hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
35 hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
37 psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
38 array_oid=hstore_array_oid)
41 """ Close the underlying asynchronous connection.
48 def fetch_next_batch(self, cur, runner):
49 """ Send a request for the next batch of places.
50 If details for the places are required, they will be fetched
53 Returns true if there is still data available.
55 ids = cur.fetchmany(100)
58 self.current_ids = None
61 if hasattr(runner, 'get_place_details'):
62 runner.get_place_details(self.conn, ids)
65 self.current_ids = ids
70 """ Get the next batch of data, previously requested with
73 if self.current_ids is not None and not self.current_ids:
76 self.wait_time += time.time() - tstart
77 self.current_ids = self.conn.cursor.fetchall()
79 return self.current_ids
85 def __exit__(self, exc_type, exc_value, traceback):
91 """ Main indexing routine.
94 def __init__(self, dsn, tokenizer, num_threads):
96 self.tokenizer = tokenizer
97 self.num_threads = num_threads
100 def has_pending(self):
101 """ Check if any data still needs indexing.
102 This function must only be used after the import has finished.
103 Otherwise it will be very expensive.
105 with connect(self.dsn) as conn:
106 with conn.cursor() as cur:
107 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
108 return cur.rowcount > 0
111 def index_full(self, analyse=True):
112 """ Index the complete database. This will first index boundaries
113 followed by all other objects. When `analyse` is True, then the
114 database will be analysed at the appropriate places to
115 ensure that database statistics are updated.
117 with connect(self.dsn) as conn:
118 conn.autocommit = True
122 with conn.cursor() as cur:
123 cur.execute('ANALYZE')
125 self.index_by_rank(0, 4)
128 self.index_boundaries(0, 30)
131 self.index_by_rank(5, 25)
134 self.index_by_rank(26, 30)
137 self.index_postcodes()
141 def index_boundaries(self, minrank, maxrank):
142 """ Index only administrative boundaries within the given rank range.
144 LOG.warning("Starting indexing boundaries using %s threads",
147 with self.tokenizer.name_analyzer() as analyzer:
148 for rank in range(max(minrank, 4), min(maxrank, 26)):
149 self._index(runners.BoundaryRunner(rank, analyzer))
151 def index_by_rank(self, minrank, maxrank):
152 """ Index all entries of placex in the given rank range (inclusive)
153 in order of their address rank.
155 When rank 30 is requested then also interpolations and
156 places with address rank 0 will be indexed.
158 maxrank = min(maxrank, 30)
159 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
160 minrank, maxrank, self.num_threads)
162 with self.tokenizer.name_analyzer() as analyzer:
163 for rank in range(max(1, minrank), maxrank + 1):
164 self._index(runners.RankRunner(rank, analyzer), 20 if rank == 30 else 1)
167 self._index(runners.RankRunner(0, analyzer))
168 self._index(runners.InterpolationRunner(analyzer), 20)
171 def index_postcodes(self):
172 """Index the entries ofthe location_postcode table.
174 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
176 self._index(runners.PostcodeRunner(), 20)
179 def update_status_table(self):
180 """ Update the status in the status table to 'indexed'.
182 with connect(self.dsn) as conn:
183 with conn.cursor() as cur:
184 cur.execute('UPDATE import_status SET indexed = true')
188 def _index(self, runner, batch=1):
189 """ Index a single rank or table. `runner` describes the SQL to use
190 for indexing. `batch` describes the number of objects that
191 should be processed with a single SQL statement
193 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
195 with connect(self.dsn) as conn:
196 psycopg2.extras.register_hstore(conn)
197 with conn.cursor() as cur:
198 total_tuples = cur.scalar(runner.sql_count_objects())
199 LOG.debug("Total number of rows: %i", total_tuples)
203 progress = ProgressLogger(runner.name(), total_tuples)
206 with conn.cursor(name='places') as cur:
207 cur.execute(runner.sql_get_objects())
209 with PlaceFetcher(self.dsn, conn) as fetcher:
210 with WorkerPool(self.dsn, self.num_threads) as pool:
211 has_more = fetcher.fetch_next_batch(cur, runner)
213 places = fetcher.get_batch()
215 # asynchronously get the next batch
216 has_more = fetcher.fetch_next_batch(cur, runner)
218 # And insert the curent batch
219 for idx in range(0, len(places), batch):
220 part = places[idx:idx + batch]
221 LOG.debug("Processing places: %s", str(part))
222 runner.index_places(pool.next_free_worker(), part)
223 progress.add(len(part))
225 LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
226 fetcher.wait_time, pool.wait_time)