1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Main work horse for indexing (computing addresses) the database.
13 import psycopg2.extras
15 from nominatim.indexer.progress import ProgressLogger
16 from nominatim.indexer import runners
17 from nominatim.db.async_connection import DBConnection, WorkerPool
18 from nominatim.db.connection import connect
20 LOG = logging.getLogger()
24 """ Asynchronous connection that fetches place details for processing.
26 def __init__(self, dsn, setup_conn):
28 self.current_ids = None
29 self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
31 with setup_conn.cursor() as cur:
32 # need to fetch those manually because register_hstore cannot
33 # fetch them on an asynchronous connection below.
34 hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
35 hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
37 psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
38 array_oid=hstore_array_oid)
41 """ Close the underlying asynchronous connection.
48 def fetch_next_batch(self, cur, runner):
49 """ Send a request for the next batch of places.
50 If details for the places are required, they will be fetched
53 Returns true if there is still data available.
55 ids = cur.fetchmany(100)
58 self.current_ids = None
61 if hasattr(runner, 'get_place_details'):
62 runner.get_place_details(self.conn, ids)
65 self.current_ids = ids
70 """ Get the next batch of data, previously requested with
73 if self.current_ids is not None and not self.current_ids:
76 self.wait_time += time.time() - tstart
77 self.current_ids = self.conn.cursor.fetchall()
79 return self.current_ids
85 def __exit__(self, exc_type, exc_value, traceback):
91 """ Main indexing routine.
94 def __init__(self, dsn, tokenizer, num_threads):
96 self.tokenizer = tokenizer
97 self.num_threads = num_threads
100 def has_pending(self):
101 """ Check if any data still needs indexing.
102 This function must only be used after the import has finished.
103 Otherwise it will be very expensive.
105 with connect(self.dsn) as conn:
106 with conn.cursor() as cur:
107 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
108 return cur.rowcount > 0
111 def index_full(self, analyse=True):
112 """ Index the complete database. This will first index boundaries
113 followed by all other objects. When `analyse` is True, then the
114 database will be analysed at the appropriate places to
115 ensure that database statistics are updated.
117 with connect(self.dsn) as conn:
118 conn.autocommit = True
122 with conn.cursor() as cur:
123 cur.execute('ANALYZE')
125 self.index_by_rank(0, 4)
128 self.index_boundaries(0, 30)
131 self.index_by_rank(5, 25)
134 self.index_by_rank(26, 30)
137 self.index_postcodes()
141 def index_boundaries(self, minrank, maxrank):
142 """ Index only administrative boundaries within the given rank range.
144 LOG.warning("Starting indexing boundaries using %s threads",
147 with self.tokenizer.name_analyzer() as analyzer:
148 for rank in range(max(minrank, 4), min(maxrank, 26)):
149 self._index(runners.BoundaryRunner(rank, analyzer))
151 def index_by_rank(self, minrank, maxrank):
152 """ Index all entries of placex in the given rank range (inclusive)
153 in order of their address rank.
155 When rank 30 is requested then also interpolations and
156 places with address rank 0 will be indexed.
158 maxrank = min(maxrank, 30)
159 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
160 minrank, maxrank, self.num_threads)
162 with self.tokenizer.name_analyzer() as analyzer:
163 for rank in range(max(1, minrank), maxrank):
164 self._index(runners.RankRunner(rank, analyzer))
167 self._index(runners.RankRunner(0, analyzer))
168 self._index(runners.InterpolationRunner(analyzer), 20)
169 self._index(runners.RankRunner(30, analyzer), 20)
171 self._index(runners.RankRunner(maxrank, analyzer))
174 def index_postcodes(self):
175 """Index the entries ofthe location_postcode table.
177 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
179 self._index(runners.PostcodeRunner(), 20)
182 def update_status_table(self):
183 """ Update the status in the status table to 'indexed'.
185 with connect(self.dsn) as conn:
186 with conn.cursor() as cur:
187 cur.execute('UPDATE import_status SET indexed = true')
191 def _index(self, runner, batch=1):
192 """ Index a single rank or table. `runner` describes the SQL to use
193 for indexing. `batch` describes the number of objects that
194 should be processed with a single SQL statement
196 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
198 with connect(self.dsn) as conn:
199 psycopg2.extras.register_hstore(conn)
200 with conn.cursor() as cur:
201 total_tuples = cur.scalar(runner.sql_count_objects())
202 LOG.debug("Total number of rows: %i", total_tuples)
206 progress = ProgressLogger(runner.name(), total_tuples)
209 with conn.cursor(name='places') as cur:
210 cur.execute(runner.sql_get_objects())
212 with PlaceFetcher(self.dsn, conn) as fetcher:
213 with WorkerPool(self.dsn, self.num_threads) as pool:
214 has_more = fetcher.fetch_next_batch(cur, runner)
216 places = fetcher.get_batch()
218 # asynchronously get the next batch
219 has_more = fetcher.fetch_next_batch(cur, runner)
221 # And insert the curent batch
222 for idx in range(0, len(places), batch):
223 part = places[idx:idx + batch]
224 LOG.debug("Processing places: %s", str(part))
225 runner.index_places(pool.next_free_worker(), part)
226 progress.add(len(part))
228 LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
229 fetcher.wait_time, pool.wait_time)