1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Main work horse for indexing (computing addresses) the database.
10 from typing import Optional, Any, cast
14 import psycopg2.extras
16 from nominatim_core.typing import DictCursorResults
17 from nominatim_core.db.async_connection import DBConnection, WorkerPool
18 from nominatim_core.db.connection import connect, Connection, Cursor
19 from ..tokenizer.base import AbstractTokenizer
20 from .progress import ProgressLogger
23 LOG = logging.getLogger()
27 """ Asynchronous connection that fetches place details for processing.
29 def __init__(self, dsn: str, setup_conn: Connection) -> None:
31 self.current_ids: Optional[DictCursorResults] = None
32 self.conn: Optional[DBConnection] = DBConnection(dsn,
33 cursor_factory=psycopg2.extras.DictCursor)
35 with setup_conn.cursor() as cur:
36 # need to fetch those manually because register_hstore cannot
37 # fetch them on an asynchronous connection below.
38 hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
39 hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
41 psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
42 array_oid=hstore_array_oid)
44 def close(self) -> None:
45 """ Close the underlying asynchronous connection.
52 def fetch_next_batch(self, cur: Cursor, runner: runners.Runner) -> bool:
53 """ Send a request for the next batch of places.
54 If details for the places are required, they will be fetched
57 Returns true if there is still data available.
59 ids = cast(Optional[DictCursorResults], cur.fetchmany(100))
62 self.current_ids = None
65 assert self.conn is not None
66 self.current_ids = runner.get_place_details(self.conn, ids)
70 def get_batch(self) -> DictCursorResults:
71 """ Get the next batch of data, previously requested with
74 assert self.conn is not None
75 assert self.conn.cursor is not None
77 if self.current_ids is not None and not self.current_ids:
80 self.wait_time += time.time() - tstart
81 self.current_ids = cast(Optional[DictCursorResults],
82 self.conn.cursor.fetchall())
84 return self.current_ids if self.current_ids is not None else []
86 def __enter__(self) -> 'PlaceFetcher':
90 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
91 assert self.conn is not None
97 """ Main indexing routine.
100 def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
102 self.tokenizer = tokenizer
103 self.num_threads = num_threads
106 def has_pending(self) -> bool:
107 """ Check if any data still needs indexing.
108 This function must only be used after the import has finished.
109 Otherwise it will be very expensive.
111 with connect(self.dsn) as conn:
112 with conn.cursor() as cur:
113 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
114 return cur.rowcount > 0
117 def index_full(self, analyse: bool = True) -> None:
118 """ Index the complete database. This will first index boundaries
119 followed by all other objects. When `analyse` is True, then the
120 database will be analysed at the appropriate places to
121 ensure that database statistics are updated.
123 with connect(self.dsn) as conn:
124 conn.autocommit = True
126 def _analyze() -> None:
128 with conn.cursor() as cur:
129 cur.execute('ANALYZE')
131 if self.index_by_rank(0, 4) > 0:
134 if self.index_boundaries(0, 30) > 100:
137 if self.index_by_rank(5, 25) > 100:
140 if self.index_by_rank(26, 30) > 1000:
143 if self.index_postcodes() > 100:
147 def index_boundaries(self, minrank: int, maxrank: int) -> int:
148 """ Index only administrative boundaries within the given rank range.
151 LOG.warning("Starting indexing boundaries using %s threads",
154 with self.tokenizer.name_analyzer() as analyzer:
155 for rank in range(max(minrank, 4), min(maxrank, 26)):
156 total += self._index(runners.BoundaryRunner(rank, analyzer))
160 def index_by_rank(self, minrank: int, maxrank: int) -> int:
161 """ Index all entries of placex in the given rank range (inclusive)
162 in order of their address rank.
164 When rank 30 is requested then also interpolations and
165 places with address rank 0 will be indexed.
168 maxrank = min(maxrank, 30)
169 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
170 minrank, maxrank, self.num_threads)
172 with self.tokenizer.name_analyzer() as analyzer:
173 for rank in range(max(1, minrank), maxrank + 1):
174 total += self._index(runners.RankRunner(rank, analyzer), 20 if rank == 30 else 1)
177 total += self._index(runners.RankRunner(0, analyzer))
178 total += self._index(runners.InterpolationRunner(analyzer), 20)
183 def index_postcodes(self) -> int:
184 """Index the entries of the location_postcode table.
186 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
188 return self._index(runners.PostcodeRunner(), 20)
191 def update_status_table(self) -> None:
192 """ Update the status in the status table to 'indexed'.
194 with connect(self.dsn) as conn:
195 with conn.cursor() as cur:
196 cur.execute('UPDATE import_status SET indexed = true')
200 def _index(self, runner: runners.Runner, batch: int = 1) -> int:
201 """ Index a single rank or table. `runner` describes the SQL to use
202 for indexing. `batch` describes the number of objects that
203 should be processed with a single SQL statement
205 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
207 with connect(self.dsn) as conn:
208 psycopg2.extras.register_hstore(conn)
209 with conn.cursor() as cur:
210 total_tuples = cur.scalar(runner.sql_count_objects())
211 LOG.debug("Total number of rows: %i", total_tuples)
215 progress = ProgressLogger(runner.name(), total_tuples)
218 with conn.cursor(name='places') as cur:
219 cur.execute(runner.sql_get_objects())
221 with PlaceFetcher(self.dsn, conn) as fetcher:
222 with WorkerPool(self.dsn, self.num_threads) as pool:
223 has_more = fetcher.fetch_next_batch(cur, runner)
225 places = fetcher.get_batch()
227 # asynchronously get the next batch
228 has_more = fetcher.fetch_next_batch(cur, runner)
230 # And insert the current batch
231 for idx in range(0, len(places), batch):
232 part = places[idx:idx + batch]
233 LOG.debug("Processing places: %s", str(part))
234 runner.index_places(pool.next_free_worker(), part)
235 progress.add(len(part))
237 LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
238 fetcher.wait_time, pool.wait_time)
242 return progress.done()